Mastering Node.js(Second Edition)
上QQ阅读APP看书,第一时间看更新

Writable streams

A Writable stream is responsible for accepting some value (a stream of bytes, a string) and writing that data to a destination. Streaming data into a file container is a common use case.

To create a Writable stream:

const stream = require('stream');
let readable = new stream.Writable({
highWaterMark: 16000,
decodeStrings: true
});

The Writable streams constructor can be instantiated with two options:

  • highWaterMark: The maximum number of bytes the stream's buffer will accept prior to returning false on writes. Default is 16 KB.
  • decodeStrings: Whether to convert strings into buffers before writing. Default is true.

As with Readable streams, custom Writable stream implementations must implement a _write handler, which will be passed the arguments sent to the write method of instances.

One should think of a Writable stream as a data target, such as for a file you are uploading. Conceptually, this is not unlike the implementation of push in a Readable stream, where one pushes data until the data source is exhausted, passing null to terminate reading. For example, here, we write 32 "A" characters to a stream, which will log them:

const stream = require('stream');

let writable = new stream.Writable({
decodeStrings: false
});

writable._write = (chunk, encoding, callback) => {
console.log(chunk.toString());
callback();
};

let written = writable.write(Buffer.alloc(32, 'A'));
writable.end();

console.log(written);

// AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
// true

There are two key things to note here.

First, our _write implementation fires the callback function immediately after writing a callback that is always present, regardless of whether the instance write method is passed a callback directly. This call is important for indicating the status of the write attempt, whether a failure (error) or a success.

Second, the call to write returned true. This indicates that the internal buffer of the Writable implementation has been emptied after executing the requested write. What if we sent a very large amount of data, enough to exceed the default size of the internal buffer?

Modifying the previous example, the following would return false:

let written = writable.write(Buffer.alloc(16384, 'A'));
console.log(written); // Will be 'false'

The reason this write returns false is that it has reached the highWaterMark option—default value of 16 KB (16 * 1,024). If we changed this value to 16383, write would again return true (or one could simply increase its value).

What should you do when write returns false? You should certainly not continue to send data! Returning to our metaphor of water in a hose: when the stream is full, one should wait for it to drain prior to sending more data. Node's Stream implementation will emit a drain event whenever it is safe to write again. When write returns false , listen for the drain event before sending more data.

Putting together what we have learned, let's create a Writable stream with a highWaterMark value of 10 bytes. We'll then set up a simulation where we push the a string of data to stdout larger than the highWaterMark some number of times. We catch buffer overflows and wait for the drain event to fire prior to sending more data:

const stream = require('stream');

let writable = new stream.Writable({
highWaterMark: 10
});

writable._write = (chunk, encoding, callback) => {
process.stdout.write(chunk);
callback();
};

function writeData(iterations, writer, data, encoding, cb) {
(function write() {

if(!iterations--) {
return cb()
}

if (!writer.write(data, encoding)) {
console.log(` <wait> highWaterMark of ${writable.writableHighWaterMark} reached`);
writer.once('drain', write);
}
})()
}

writeData(4, writable, 'String longer than highWaterMark', 'utf8', () => console.log('finished'));

Each time we right we check if the stream write action returned false, and if so we wait for the next drain event before running our write method again.

You should be careful to implement proper stream management, respecting the "warnings" emitted by write events, and properly waiting for the drain event to occur prior to sending more data.

The fluid data in a Readable stream can be easily redirected to a Writable stream. For example, the following code will take any data sent by a terminal (stdin is a Readable stream) and echo it back to the destination Writable stream (stdout): process.stdin.pipe(process.stdout). Whenever a Writable stream is passed to a Readable stream's pipe method, a pipe event will fire. Similarly, when a Writable stream is removed as a destination for a Readable stream, the unpipe event fires. To remove a pipe, use the following: unpipe(destination stream)