Вопрос

I have a NodeJS stream transform which needs to output some internal state when its input ends.

As an analogy, consider a transform which splits incoming data into lines. When the incoming data ends, whatever (un-linefeed-terminated) data is left must be output.

How can I do this?

(I've tried _flush, but it doesn't get called when I write(null) into the transform.)

Update:

var Transform = require('stream').Transform;
var util = require('util');

exports.createLinesTransform = createLinesTransform;

function createLinesTransform(options) {
  return new LinesTransform(options);
}

function LinesTransform(options) {
  options = options ? options : {};
  options.objectMode = true;
  Transform.call(this, options);
  this._buf = '';
  this._last_src = undefined;
}
util.inherits(LinesTransform, Transform);

LinesTransform.prototype._transform = function(chunk, encoding, done) {
  console.log('chunk', chunk, '_buf', this._buf);
  this._buf += chunk.payload;
  for (var i = 0; i < this._buf.length; i++) {
    if (this._buf.charAt(i) === '\n') {
      this.push({src: chunk.src, payload: this._buf.slice(0, i)});
      this._last_src = chunk.src;
      this._buf = this._buf.slice(i + 1);
    }
  }
  done();
}

// this doesn't get called when the input stream ends
LinesTransform.prototype._flush = function(done) {
  console.log('_flush');
  this.push({src: this._last_src, payload: this._buf});
  done();
}

and a test:

  it('should make a working LinesTransform', function() {
    var lines = createLinesTransform();
    var rxd = [];
    lines.on('data', function(data) {
      console.log('data', data);
      rxd.push(data);
    });

    var ret = lines.write({src:{},payload:'hel'});
    assert.deepEqual([], rxd);
    ret = lines.write({src:{},payload:'lo'});
    assert.deepEqual([], rxd);
    lines.write({src:{},payload:' world!\na second'});
    assert.deepEqual([{"src":{},"payload":"hello world!"}], rxd);
    lines.write({src:{},payload:'line\n'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write({src:{},payload:'and some trailing data'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write(null);
    lines.end();
    // this last assert fails
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"},
                      {"src":{},"payload":"and some trailing data"}],
                     rxd);
  });
Это было полезно?

Решение

_flush is the correct thing to do, I needed to add a short delay in my tests to make it work.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top