Implementing readable streams
Streams producing data that another process may have an interest in are normally implemented using a Readable stream. A Readable stream saves the implementer all the work of managing the read queue, handling the emitting of data events, and so on.
To create a Readable stream, use this:
const stream = require('stream');
let readable = new stream.Readable({
encoding: "utf8",
highWaterMark: 16000,
objectMode: true
});
As previously mentioned, Readable is exposed as a base class, which can be initialized through three options:
- encoding: Decode buffers into the specified encoding, defaulting to UTF-8.
- highWaterMark: Number of bytes to keep in the internal buffer before ceasing to read from the data source. The default is 16 KB.
- objectMode: Tell the stream to behave as a stream of objects instead of a stream of bytes, such as a stream of JSON objects instead of the bytes in a file. Default false.
In the following example, we create a mock Feed object whose instances will inherit the Readable stream interface. Our implementation need only implement the abstract _read method of Readable, which will push data to a consumer until there is nothing more to push, at which point it triggers the Readable stream to emit an end event by pushing a null value:
const stream = require('stream');
let Feed = function(channel) {
let readable = new stream.Readable({});
let news = [
"Big Win!",
"Stocks Down!",
"Actor Sad!"
];
readable._read = () => {
if(news.length) {
return readable.push(news.shift() + "\n");
}
readable.push(null);
};
return readable;
};
Now that we have an implementation, a consumer might want to instantiate the stream and listen for stream events. Two key events are readable and end.
The readable event is emitted as long as data is being pushed to the stream. It alerts the consumer to check for new data via the read method of Readable.
Note again how the Readable implementation must provide a private _read method that services the public read method exposed to the consumer API.
The end event will be emitted whenever a null value is passed to the push method of our Readable implementation.
Here, we see a consumer using these methods to display new stream data, providing a notification when the stream has stopped sending data:
let feed = new Feed();
feed.on("readable", () => {
let data = feed.read();
data && process.stdout.write(data);
});
feed.on("end", () => console.log("No more news"));
// Big Win!
// Stocks Down!
// Actor Sad!
// No more news
Similarly, we can implement a stream of objects through the use of the objectMode option:
const stream = require('stream');
let Feed = function(channel) {
let readable = new stream.Readable({
objectMode : true
});
let prices = [{price : 1},{price : 2}];
readable._read = () => {
if(prices.length) {
return readable.push(prices.shift());
}
readable.push(null);
};
return readable;
};
Having been placed in objectMode, each chunk pushed is expected to be an object. The reader for this stream can then work on the assumption that each read() event will produce a single object:
let feed = new Feed();
feed.on("readable", () => {
let data = feed.read();
data && console.log(data);
});
feed.on("end", () => console.log("No more news"));
// { price: 1 }
// { price: 2 }
// No more news
Here, we see that each read event is receiving an object, rather than a buffer or string.
Finally, the read method of a Readable stream can be passed a single argument, indicating the number of bytes to be read from the stream's internal buffer. For example, if it was desired that a file should be read one byte at a time, one might implement a consumer using a routine similar to this:
let Feed = function(channel) {
let readable = new stream.Readable({});
let news = 'A long headline might go here';
readable._read = () => {
readable.push(news);
readable.push(null);
};
return readable;
};
Note that we're pushing the entirety of news into the stream, and terminating with null. The stream is primed with the entire string of bytes. Now the consumer:
feed.on('readable', () => {
let character;
while(character = feed.read(1)) {
console.log(character.toString());
}
});
// A
//
// l
// o
// n
// ...
// No more bytes to read
Here, it should be clear that the Readable stream's buffer was filled with a number of bytes all at once, but was read from discretely.