Broken by Design

Dipping toes in node.js streams : Readable

Node.js primary model of handling IO is called streams. These are streams that either produce data - Readable, consume data - Writable, both consume and produce - Duplex or transform it - Transform. In this article we are going to take a closer look at the Readable streams and how to implement them.

stream.Readable

The readable stream basically represents a producer. Whether it will be a network socket, a file, or a hardware device, it's all the same. There is some data that is being produced or is already produced and is being read. And the stream.Readable interface provides access to it in chunks by emitting 'data' event every time a chunk of data is available.

Producing data with stream.Readable

So lets take a look at a very basic data producer, one that produces lines with random numbers:

var stream = require('stream');
var util = require('util');
 
util.inherits(RandStream, stream.Readable);
 
function RandStream(options){
  // Call the parent class constructor
  stream.Readable.call(this, options);
}
 
// Implement _read method that produces  some data
RandStream.prototype._read = function () {
  // Get a random number
  var i = Math.floor(Math.random()* 100);
  // Push it on the stream as String
  // By default streams work only with String or Buffer data
  this.push(i.toString()+"\n");
};
 
var randStream = new RandStream()
 
// Each time a chunk is available,
// write it on stdout. Attaching
// handler to the 'data' event puts the stream
// in 'flowing' mode, which means it will start
// producing data immediately
randStream.on('data', function(data){
  process.stdout.write(data);
});

Running this, just prints an endless stream of lines with numbers on the console.

$ node randstream.js
84
65
84
35
...

Notice that we didn't do anything to make the stream produce data. Connecting event handler on the 'data' event acts like a trigger, for the stream to start producing data.

Endless streams of data are rarely that useful, so maybe we should try to make our little stream behave more like everything else in the limited reality we live in, and limit the gibberish it produces.

var stream = require('stream');
var util = require('util');
 
util.inherits(RandMessageStream, stream.Readable);
 
// count represents how many lines of output we want.
function RandMessageStream(count, options){
  stream.Readable.call(this, options);
  this.count = count;
}
 
RandMessageStream.prototype._read = function () {
  // Have we reached the end of the stream?
  if(this.count-- <= 0){
    // Yes, push a null. This signals the end of stream
    // to the stream mechanics
    this.push(null);
    return;
  }
  // No, just produce one ASCII character
  var i = Math.round(Math.random()* (126-32) + 32);
  this.push(String.fromCharCode(i));
};
 
var rmStream = new RandMessageStream(8);
 
// pipe connects a Readable stream to a Writable,
// piping all data that the Readable produces to
// the Writable stream, in this case the process standard output
rmStream.pipe(process.stdout);
 
// 'end' event is emitted when the end of stream is reached
rmStream.on('end', function(){
  process.stdout.write('\n');
});

Running this gives us:

$ node randmessagestream.js
?csace_/

This time we are not using a 'data' event handler, but the .pipe() method. The pipe method basically connects a Readable stream to a Writable one, much like the pipes in the shell do, and since the process stdout is a Writable stream we can directly pipe to it. For our simple case it is functionally equivalent to the 'data' handler in the previous example. Also note the handler for the 'end' event. Since we do actually have end of the stream now, we can use it to add a newline to the message.

OK, so far so good. We have streams producing random data, but that's not of much practical use. Lets see how we can stream some actual data, this is not very different from the previous example, but we will use a buffer instead of counter and stream it line by line:

var stream = require('stream');
var util = require('util');
 
util.inherits(LineStream, stream.Readable);
 
// buffer is the buffer which we will stream line by line
function LineStream(buffer, options){
  stream.Readable.call(this, options);
  this.buffer = buffer;
  this.start = 0
}
 
LineStream.prototype._read = function (size) {
  // Find the next newline character
  var idx = this.buffer.indexOf('\n', this.start);
  if( idx >= 0 ){
    // If have a new line character, push()
    // the slice from current position up to it
    this.push( this.buffer.slice(this.start, idx + 1) );
    this.start = idx + 1;
  } else {
    // push() everything till the end of the buffer
    this.push( this.buffer.slice(this.start) );
    // signal end of stream
    this.push( null );
  }
};
 
new LineStream( 'Where am I?\n' +
                'What is going on here?\n' +
                'How long do I need to keep pushing?\n'
                ).pipe(process.stdout);

As you can guess, it outputs 3 lines of output, just as much as we have in our initial Buffer. And although some awareness seems to be rising in our little Readable stream it still lacks quite important quality for a node.js stream - asynchronicity. It produces data on request (when _read() is called), but always produces it synchronously. Whether if it was a real asynchronous data source (like, network socket or a human :)) it wouldn't be possible to provide data chunk every time it is requested. For example if it a human typing it, he may not be able to keep up with the speed we request more chunks. And blocking to wait for is not going to work well in node.js as we all very well know. So lets try to see how a Readable stream with a human for data source will look like:

var stream = require('stream');
var util = require('util');
 
util.inherits(TypistStream, stream.Readable);
 
// buffer will hold the text which our typist will be typing
function TypistStream(buffer, options) {
  stream.Readable.call(this, options);
  this.buffer = buffer;
  this.start = 0
}
 
TypistStream.prototype._read = function(size) {
  var slice = null;
  // If we haven't reached the end of the buffer, pick next character to type
  if( this.start < this.buffer.length ){
    slice = this.buffer.slice(this.start, this.start + 1);
    this.start += 1;
  }
 
  var self = this;
  // Add a random delay , to simulate person typing
  var delay = Math.floor( Math.random()*300 + 50);
  // push() the next char after some delay
  setTimeout(function() {
      self.push(slice);
  }, delay);
};
 
var typist = new TypistStream( "Hello,\n"+
                  "I am the typist that types inside this stream\n"+
                  "Please let me out already!\n"+
                  "You've got your point\n"
                );
typist.pipe(process.stdout);

Running this will produce just as you would expect the text on the console with visible delays between each letter. And because the underlying stream implementation will not request a new chunk before we push() one for the current request, everything comes out in the correct order.

This all looks nice and everything, but of course we completely ignored so far the fact that errors happen sometimes. Like for example, what happens if the typist working in the stream from the previous example has no energy to type any more? How do we handle that. That's where the 'error' event comes in to notify the consumer of our stream, that something went wrong. Let's modify the TypistStream and add some energy to represent the physical energy of our typist to press keys:

var stream = require('stream');
var util = require('util');
 
util.inherits(HungryTypistStream, stream.Readable);
 
// energy is the amount of characters the typist can type
// before starving
function HungryTypistStream(energy, buffer, options) {
  stream.Readable.call(this, options);
  this.buffer = buffer;
  this.start = 0;
  this.energy = energy;
}
 
HungryTypistStream.prototype._read = function(size) {
  var slice = null;
  if( this.start < this.buffer.length ){
    slice = this.buffer.slice(this.start, this.start + 1);
    this.start += 1;
    // decrease energy
    this.energy -= 1;
  }
 
  // Do we have enough energy to type ?
  if( this.energy <= 0 ){
    // no, emit error, to tell our consumer that we are starving
    this.emit('error', new Error('No.. more.. energy... (X _ X)'));
    this.push(null);
  } else {
    // yes, push() character
    var self = this;
    var delay = Math.floor( Math.random()*300 + 50);
    setTimeout(function() {
        self.push(slice);
    }, delay);
  }
};
 
var typist = new HungryTypistStream(12, "Hello,\n"+
                  "I am the typist that types inside this stream\n"+
                  "Please let me out already!\n"+
                  "You've got your point\n"
                );
typist.pipe(process.stdout);
 
// Register a handler for errors in the stream
typist.on('error', function (error) {
  console.error('\nIt seems there is a problem: ', error);
});

This time we don't get the full complaint, the exhaustion took over:

$ node hungrytypiststream.js
Hello,
I am
It seems there is a problem:  [Error: No.. more.. energy... (X _ X)]

So, when our stream encounters an error it emits the 'error' event with the actual error, to notify the consumers. Notice that emitting an error doesn't actually end the stream, so if the error is fatal and the stream cannot read any more we should push(null) to close the stream (like in this case, the guy is apparently dead, so he cannot type any more).

Now, you'd think maybe "OK, I get it, but can we get to a more practical and harmless example before anybody else gets hurt? Maybe a file or a socket?". Sure, let's make a stream that reads a file in tiny chunks:

var fs = require('fs');
var stream = require('stream');
var util = require('util');
 
util.inherits(FileReadStream, stream.Readable);
 
// source is the file descriptor of the file we will be reading
function FileReadStream(source, options){
  stream.Readable.call(this, options);
  this.source = source;
}
 
FileReadStream.prototype._read = function (size) {
  var self= this;
  // Read in 10 bytes chunks
  var buf = new Buffer(10);
  fs.read(this.source, buf, 0, 10, null, function(err, bytesRead, chunk){
    if( err ){
      // on error emit error and end the stream
      self.emit('error', err);
      self.push(null);
      return;
    }
    // Push the chunk we've just read on the stream
    self.push( chunk.slice(0, bytesRead).toString());
  });
};
 
var ucStream = new FileReadStream(fs.openSync(__filename,'r'));
ucStream.pipe(process.stdout);
ucStream.on('error', function(error){
  console.error(error);
});

This stream will print its source file. While quite easy to implement a stream for a file it is also pointless because node.js already provides fs.ReadStream which does exactly that, and does it more efficiently. Same applies for sockets, and few other useful things like zip archives, crypto and so on...

Consuming data from stream.Readable

All the examples above consume data from the Readable stream in flowing mode, which basically means the stream will produce data while there is a consumer for that data or the end of stream is reached. This is done by the underlying stream code, which reads chunks and provides them to you as fast as possible. Stream is switched to flowing mode automatically when 'data' handler is attached to it (either directly or through using the .pipe() method), or if .resume() is called on the stream.

The other mode a Readable stream can be in is paused, where data must be requested each time we need it. But since everything in node is asynchronous, we can't just read data at any time, we must wait for it to become available, and this is signalled by the 'readable' event. So if we have to transform the TypistStream example to use paused streams it will look like this:

var stream = require('stream');
var util = require('util');
 
util.inherits(TypistStream, stream.Readable);
 
// buffer will hold the text which our typist will be typing
function TypistStream(buffer, options) {
  stream.Readable.call(this, options);
  this.buffer = buffer;
  this.start = 0
}
 
TypistStream.prototype._read = function(size) {
  var slice = null;
  // If we haven't reached the end of the buffer, pick next character to type
  if( this.start < this.buffer.length ){
    slice = this.buffer.slice(this.start, this.start + 1);
    this.start += 1;
  }
 
  var self = this;
  // Add a random delay , to simulate person typing
  var delay = Math.floor( Math.random()*300 + 50);
  // push() the next char after some delay
  setTimeout(function() {
      self.push(slice);
  }, delay);
};
 
var typist = new TypistStream( "Hello,\n"+
                  "I am the typist that types inside this stream\n"+
                  "Please let me out already!\n"+
                  "You've got your point\n"
                );
 
typist.on('readable', function(){
  var c = typist.read();
  if(c)
    process.stdout.write( c );
});

The result is the same but as you can see we read data with stream.read(), when it is available in the 'readable' event.

That's all for Readable streams for now, we'll take a look at the Writable streams in the next part....

November 10, 2015