ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Node.js] Readable Stream을 다루는 방법
    Node.js 2021. 11. 29. 20:38
    반응형

    Node.js는 데이터를 클라이언트에 전달하거나, 클라이언트로부터 데이터를 받을 때, 또는 파일을 읽고 쓸 때 Readable stream과 Writable stream을 사용하게 된다. 이번에 다룰 Readable stream을 이해하기 위해서는 먼저 Readable stream의 2가지 모드에 대해서 알아야 한다.

    Readable stream의 2가지 모드

    Readable stream은 flowingpaused 2가지 모드 중 하나로 존재한다.

    다음 3가지 상황에서 Readable stream은 flowing 모드로 전환된다.

    • 'data' 이벤트 리스너를 등록한 경우
    • Readable.resume() 메소드를 실행한 경우
    • Readable.pipe() 메소드를 통해 다른 스트림과 연결된 경우

    그리고 다음 상황에서 paused 모드로 전환된다.

    • 'readable' 이벤트 리스너를 등록한 경우
    • Readable.pause() 메소드를 실행한 경우
    • Readable.unpipe() 메소드를 실행한 경우

    Readable stream으로 다음과 같은 hello.txt 파일의 데이터를 읽어서 Writable stream을 이용해 hello-copy.txt 파일을 만드는 예제를 통해 Readable stream을 이용하는 방법을 알아보려고 한다.

    hello.txt

    Readable stream으로부터 Writable stream 으로 데이터를 전달하는 방법은 3가지가 있다. readable.on('data'), readable.on('readable') 이벤트를 사용하거나 readable.pipe() 메소드를 사용하는 것이다.

    on('readable')

    readable 이벤트는 스트림에서 데이터를 읽을 준비가 되어있으니, 데이터를 언제든지 읽어서 원하는대로 사용하세요 를 알려주는 이벤트이다. 다음처럼 사용할 수 있다.

    import * as fs from 'fs';
    
    const rs = fs.createReadStream('hello.txt');
    const ws = fs.createWriteStream('hello-copy.txt');
    
    rs.on('readable', () => {
      let chunk;
      while (chunk = rs.read()) {
        ws.write(chunk);
      }
    });

    hello-copy.txt

    Readable stream의 read 메소드는 Buffer 형식의 데이터를 반환하고, 더이상 읽어올 데이터가 없을 때는 null을 반환하고 stream이 종료된다.

    파라미터로는 숫자를 넘겨줄 수 있는데, 한 번에 읽어올 데이터의 크기를 지정할 수 있다. 그리고 Writable stream에 작성할 때 데이터에 변형을 가해서 작성할 수 있다. 예를들어 다음처럼 실행하면 3byte 씩 데이터를 가져오고, 개행이 추가된 문자열을 작성하게된다.

    rs.on('readable', () => {
      let chunk;
      while (chunk = rs.read(3)) {
        ws.write(`${chunk}\n`);
      }
    });

    readable 이벤트 리스너로 stream을 다룰 때 중요한 것은 read 메소드를 실행할 때, Readable stream은 paused mode 이어야 한다는 것이다. Readable stream의 readableFlowing 이라는 프로퍼티를 통해 현재 모드를 확인할 수 있다.

    rs.on('readable', () => {
      let chunk;
      while (chunk = rs.read(3)) {
        console.log(rs.readableFlowing);
        ws.write(chunk);
      }
    });

    false
    false
    false
    false

    stream의 종료

    모든 stream을 다룰 때 가장 중요한 것은 stream을 다 사용하고 난 뒤에는 반드시 종료시켜 주어야 한다는 것이다. stream이 종료되는 것을 관리하지 않는 코드는 버그와 메모리 누수의 위험을 가지는 코드이다.

    위 코드에서 Readable stream과 Writable stream에 close 이벤트 리스너를 등록해서 stream이 종료되고 있는지 확인해보자.

    rs.on('close', () => {
      console.log('rs closed!');
    });
    
    ws.on('close', () => {
      console.log('ws closed!');
    });
    
    rs.on('readable', () => {
      let chunk;
      while (chunk = rs.read(3)) {
        ws.write(`${chunk}\n`);
      }
    });

    rs closed!

    Readable stream은 모든 데이터를 읽어온 다음에 종료되는 것을 확인할 수 있고, Writable stream은 종료되지 않은 상태로 프로그램이 종료된 것을 확인할 수 있다. Writable stream은 프로그램이 종료될 때까지 열려있는 상태로 또다른 데이터가 쓰여지기를 기다리는 것이다.

    이러한 상황을 방지하기 위해 다음처럼 Readable stream이 모든 데이터를 다 불러왔을 때, Writable stream에 더이상 보낼 데이터가 없음을 알려주는 코드를 추가해야한다.

    rs.on('end', () => {
      ws.end();
    })

    rs closed!
    ws closed!

    on('data')

    data 이벤트는 데이터를 읽어왔으니, 데이터를 원하는대로 사용하세요_ 를 알려주는 이벤트이다. 다음처럼 사용할 수 있다.

    import * as fs from 'fs';
    
    const rs = fs.createReadStream('hello.txt');
    const ws = fs.createWriteStream('hello-copy.txt');
    
    rs.on('end', () => {
      ws.end();
    });
    
    rs.on('data', (chunk) => {
      ws.write(chunk);
    });

    hello-copy.txt

    앞에서 설명했듯이 data 이벤트 리스너를 등록하는 순간 Readable stream은 flowing 모드로 전환된다. flowing 모드에서 Readable stream은 알아서 data를 읽어오게 된다. 그리고 여전히 Writable stream의 종료를 관리해주어야 한다.

    사용자는 다음과 같이 Readable stream의 흐름을 조절할 수도 있다.

    rs.on('data', chunk => { // 1
      ws.write(chunk); // 4
    });
    
    rs.pause();  // 2
    
    setTimeout(() => {
      rs.resume(); // 3
    }, 1000);

    위 코드에서 Readable stream은 1번 코드가 실행되면서 flowing 모드로 전환되지만 2번 코드에서 pause 메소드가 실행되면서 paused 모드로 전환된다. 그리고 1초 뒤에 실행되는 타이머가 등록되고, 이 시점까지 파일 읽기와 쓰기는 일어나지 않는다. 1초 뒤에 3번 코드가 실행되면서 다시 flowing 모드로 전환되고, 이때부터 데이터 불러오기가 시작되면서 4번 data 이벤트 콜백이 호출된다.

    한번에 읽어오는 데이터의 크기는 highWaterMark라는 값으로 고정되어있는데, 기본 값은 16,384byte(16KB)이다. 이것은 Readable stream을 생성할 때 옵션으로 넘겨줄 수 있다.

    readable 이벤트 리스너를 등록할 때보다 단순하게 데이터를 다룰 수 있지만 데이터를 언제 읽어올지, 어느만큼 읽어올지는 제어할 수 없다.

    pipe()

    pipe 메소드는 목적지를 알려줄테니 알아서 데이터를 보내줘 를 지시하는 메소드이다. pipe 메소드의 파라미터로 Writable stream 또는 Writable stream을 확장한 Duplex stream이나 Transform stream을 데이터의 목적지로 넘겨줄 수 있다.

    import * as fs from 'fs';
    
    const rs = fs.createReadStream('hello.txt');
    const ws = fs.createWriteStream('hello-copy.txt');
    
    rs.pipe(ws);

    hello-copy.txt

    pipe 메소드를 실행하는 순간 Readable stream은 flowing 모드로 전환되고, 데이터는 목적지로 이동하게 된다.

    앞서 설명한 방법들 중 제일 사용방법이 간단하다. 게다가 pipe 메소드는 시작점이 되는 stream이 데이터를 모두 불러와서 종료되면, 목적지 stream의 end메소드를 호출하면서 종료시켜준다. 개발자가 직접 종료를 관리하지 않아도 목적지 stream이 종료되는 것을 다음 코드를 통해서 확인할 수 있다.

    rs.on('close', () => {
      console.log('rs closed!');
    });
    
    ws.on('close', () => {
      console.log('ws closed!');
    });
    
    rs.pipe(ws);

    rs closed!
    ws closed!

    그리고 pipe 메소드는 목적지 stream을 반환하기 때문에, Node.js의 대표적인 Transfrom stream인 Gzip을 이용하여 다음처럼 pipe 체이닝을 사용할 수 있다.

    import * as fs from 'fs';
    import * as zlib from 'zlib';
    
    const rs = fs.createReadStream('hello.txt');
    const zs = zlib.createGzip();
    const ws = fs.createWriteStream('hello-copy.txt.gz');
    
    rs.pipe(zs).pipe(ws);

    에러 핸들링

    Readable stream을 이용하는 3가지 방법을 알아보았다. 어플리케이션을 만들면서 요구사항에 맞게 원하는 방식을 선택해서 사용할 수 있는데, 모든 방법에서 중요한 것은 에러를 어떻게 다룰 것인지이다.

    stream에서는 예상치 못한 에러가 언제든지 발생할 수 있다. stream이 Socket이라면 네트워크 에러일 수도 있고, FileStream이라면 시스템 에러일 수 있다. 이 때 error 이벤트 리스너를 등록해서 에러를 핸들링해주어야 한다.

    import * as fs from 'fs';
    
    const rs = fs.createReadStream('hello.txt');
    const ws = fs.createWriteStream('hello-copy.txt');
    
    rs.on('close', () => {
      console.log('rs closed!');
    });
    
    ws.on('close', () => {
      console.log('ws closed!');
    });
    
    rs.on('end', () => {
      ws.end();
    });
    
    rs.on('error', (error) => {
      console.error(`Error from rs: ${error.message}`);
    });
    
    rs.on('data', (chunk) => {
      rs.destroy(new Error('Something happend!!')); // error 발생!
    });

    Error from rs: Something happend!!
    rs closed

    stream 에러를 재현하기 위해 destroy 메소드를 이용해 에러를 전달해주었다. 여기서 문제는 Readable stream은 종료가 되었지만 모든 데이터를 다 읽었다는 end 이벤트는 발생하지 않기 때문에 Writable stream은 종료되지 않는다는 것이다.

    error 이벤트 리스너에서 따로 Writable stream을 종료시켜주어야 한다.

    rs.on('error', (error) => {
      console.error(`Error from rs: ${error.message}`);
      ws.close();
    });

    이것은 pipe메소드를 사용할 때에도 해당되는데, pipe 메소드는 출발지 stream에서 오류가 나서 종료된 경우에는 목적지 stream을 종료시켜주지 않는다. 이게 목적지 스트림이 여러개이거나 pipe 체이닝이 걸려있는 경우, 모든 stream에 에러 핸들러를 달아주고 목적지 stream을 종료하는게 여간 귀찮은 일이 아닐뿐더러 코드가 굉장히 장황해진다. 그럴때는 pipeline 메소드를 이용하면 편리하게 stream의 에러와 종료를 관리할 수 있다.

    pipline()

    stream 모듈의 pipeline 메소드는 출발지 stream과 목적지 stream, 그리고 그 사이에 여러 Duplex 및 Transform stream을 전달받을 수 있다.

    마지막 매개변수로는 모든 pipe 작업이 끝났을 때 호출할 콜백을 넘겨주는데, 연결된 스트림 중 하나에서 오류가 난 경우 해당 오류를 매개변수로 전달받게 된다.

    pipeline이 편리한 또 한가지는 오류가 발생했을 때, stream의 종료까지 책임져준다는 것이다.

    import { pipeline } from 'stream';
    import * as fs from 'fs';
    import * as zlib from 'zlib';
    
    const rs = fs.createReadStream('hello.txt');
    const zs = zlib.createGzip();
    const ws = fs.createWriteStream('hello-copy.txt.gz');
    
    rs.on('close', () => {
      console.log('rs closed!');
    })
    
    zs.on('close', () => {
      console.log('zs closed!');
    })
    
    ws.on('close', () => {
      console.log('ws closed!');
    })
    
    rs.on('data', (chunk) => {
      rs.destroy(new Error('asdjfasdf'));
    })
    
    pipeline(rs, zs, ws, (error) => {
      console.error(error);
    });

    Error: Something happend!!
    rs closed!
    zs closed!
    ws closed!

    Conclusion

    Readable stream의 2가지 모드와 데이터를 다루는 3가지 방법을 알아보았다. 실제로 서비스를 운영하다가 stream에서 메모리 누수를 발견한 경험이 있는데, stream의 종료를 제대로 관리해주지 않아서 발생한 일이었다. stream을 사용할 때는 반드시 종료를 잘 관리해주어야 한다.

    그리고 Node.js 공식문서에서 Readable stream 을 다룰 때 readable.on('data'), on('readable'), pipe() 세 API를 혼용해서 사용하면 예상치 못한 결과가 나올 수 있다고 경고한다. 간단히 데이터를 전달만 하는 경우에는 pipe 메소드를 이용하고, 좀 더 섬세한 작업이 필요할 때는 readable.on('readable')/readable.read() 또는 readable.pause()/readable.resume() 을 사용하라고 권장한다.

    References

    반응형

    'Node.js' 카테고리의 다른 글

    [Node.js] 공식문서로 이해하는 이벤트 루프  (0) 2022.02.03

    댓글

Designed by Tistory.