- Published on
Node.js Streams
- Authors
- Name
- Dan Orlando
- @danorlando1
Streams are one of the critical peices of Node that make it so powerful. When working with large amounts of data, this should be your go-to solution. Streams are collections of data that might not be available all at once and dont have to fit in memory. With streams, data is read and written in chunks. This is why all of the data doesn't all have to be in memory at once.
If you think about streaming services like Netflix or Spotify for example, you never have to download the entire video or playlist before watching or listening. Instead, the browser receives this data in a continuous flow of chunks, allowing the file to download or "buffer" as the file plays.
The response
object is actually a writable stream. If we have a big file as a readable stream, we can pipe one into the other and avoid filling up the memory. The fs module can give us a readable stream for any file using the createReadStream method, then we simply pipe this readable stream into the response writable stream. This avoids buffering in memory.
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const file = fs.createReadStream('./big-file.txt');
file.pipe(res);
});
server.listen(8000);
There are 4 Fundamental Stream Types:
Readable Streams
- This is an abstraction for a source from which data can be consumed (eg.
fs.createReadStream
)
- This is an abstraction for a source from which data can be consumed (eg.
Writable Streams
- This is an abstraction for a destination to which data can be written (eg.
fs.createWriteStream
)
- This is an abstraction for a destination to which data can be written (eg.
Duplex Streams
- These streams are both readable and writable
Transform Streams
- These are duplex streams that can modify or transform the data as it is read and written (eg.
zlib.createGzip
) - As a writable stream, it receives pieces of data, transforms (changes or discards) them and then outputs them as a readable stream.
- Sometimes referred to as a "through" stream
All streams are event emitters. This means that we can listen for events like
data
,end
,error
,close
,pipe
,unpipe
,finish
,drain
,pause
, andresume
.For readable streams, the important events are
data
andend
. The data event is emitted when there is data to be read, and the end event is emitted when there is no more data to be read. For writable streams, the important events aredrain
andfinish
. The drain event is emitted when the writable stream is ready to accept more data, and the finish event is emitted when the writable stream has finished writing all the data it has received and all data has been flushed.- These are duplex streams that can modify or transform the data as it is read and written (eg.
Good things to know about Readable Streams:
- 2 modes: Paused and Flowing (also referred to as pull vs. push)
- All readable streams start in paused mode
- Can be switched into flowing and back to paused where needed
- In paused mode, use stream.read() to read from the stream
- In flowing mode, must use events to consume the data
- In flowing mode, data can be lost if no consumers are available to handle it, must use data event handler
- Adding a data event handler switches a paused stream into flowing mode
- Removing the data handler switches it back to paused mode
- Usually, to switch between these two modes, we use the resume and pause methods
Simple Writable Stream:
const { Writable } = require('stream');
const ws = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
We can then consume this stream by piping it into a readable stream by adding process.stdin.pipe(ws);
to the end of the script.
Simple Readable Stream:
const { Readable } = require('stream');
const rs = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
rs.currentCharCode = 65;
rs.pipe(process.stdout);
In the past, you had to be careful not to mix async functions with EventEmitter because there was no way to catch a regection when it was emitted within an event handler. The solution was always to wrap the async function in a try/catch block. Instead, you can set EventEmitter.captureRejections = true
and a catch()
handler will be added every time a Promise is returned from an event handler. If a rejection occurs, the 'error'
event will be emitted, avoiding the 'unhandledRejection'
.
Piping Streams
When data must be processed in multiple steps, streams can be connected to each other, sending the data through a "pipeline" of transformations. With piping, input is receved from a readable stream and each step is completed and sent on to the next step via a transform stream. For the last step, we can write the data from the most recent readable stream in a writable stream, or process the data from the most recent readable stream by some other means.
When piping a readable stream into a writable stream, the readable stream will emit the
data
event when it has data to be read. The writable stream will then emit thedrain
event when it is ready to accept more data.
Perhaps a more elegant method of working with streams in a transform pipeline is through asynchronous iteration. This will retrieve the contents of a data container asynchronously, which means the current task may be paused before retrieving an item. The iteration should be done on the readable
event.
const fs = require('fs');
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream('./big-file.txt', {encoding: 'utf8'});
logChunks(readable);
The idea behind asynchronous iteration is that we can pause the iteration while we're working on the next item. It is a more elegant alternative to transform streams for processing streamed data in multiple steps. There are a few characterstics of asynchronous iteration:
- The input is a readable stream
- The first transformation is done by an async generator function which iterates over the input stream and yields chunks of data.
- The second transformation is done by a async function which consumes the yielded chunks. We can continue to transform futher by using more async generators.
- At the end of the transformation pipeline, we have options for handling the async iterable that is returned by the final generator: we can convert it to a readable stream with
Readable.from()
which can then be piped into a writable stream later, or we can use an async function to process it.
import {Readable} from 'stream';
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
previous += chunk;
while (true) {
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
previous = previous.slice(eolIndex+1);
startSearch = 0;
}
}
if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
lineNumber++;
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
{encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));
In this example, chunkIterable
is an async or sync iterable over chunks of data. The chunksToLines
function returns an async iterable over "lines". lineIterable
is an async iterable over lines of data. logLines
is an async function that logs the lines.
Writing to Writable Streams
There are three primary approaches to writing to a writable stream:
- Write directly to the stream via the
write()
method. - Use the
pipe()
method to pipe a readable stream into the writable stream. - Use
pipeline()
from thestream
module to pipe a readable stream into a writable stream.
We can use the scenario of writing a stream to file to demonstrate the three approaches.
First, we'll use an async function to write directly to a writable stream.
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // convert to a promise
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // if we can't write, pause the stream
await once(writable, 'drain'); // wait for drain event to resume writing
}
}
writable.end();
await finished(writable); // wait for the stream to finish
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
Notice that we promisified stream.finished
. Normally stream.finished()
uses a callback, but we can use this util function to convert it to a Promise. Next, take a look at the if condition. By calling await once(writable, 'drain')
, we are waiting for the drain event to resume writing. We then close the writable stream and wait until writing is done.
Using pipeline(readable, writable)
:
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
iterable, {encoding: 'utf8'});
const writable = fs.createWriteStream(filePath);
await pipeline(readable, writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
// ···
This method uses Readable.from() to create a readable stream from the iterable. Then, we use pipeline() to pipe the readable stream into the writable stream. We can also use the Readable.pipe()
method to pipe a readable stream into a writable stream, but the only issue with this method is that if the readable stream emits an error, the writable stream will not close automatically.