-
Notifications
You must be signed in to change notification settings - Fork 11
/
stream-replay.js
90 lines (83 loc) · 2.71 KB
/
stream-replay.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"use strict";
var stream = require('readable-stream');
var util = require('util');
var id = 0;
var StreamReplay = module.exports = function (options) {
if (this==null) return new StreamReplay(options);
if (!options) options = {};
options.decodeStrings = false;
stream.Writable.call(this,options);
this._buffer = [];
this._children = [];
this.finished = false;
this.id = ++id;
var self = this;
this.once('finish',function(){
self.emit('end');
self.finished = true;
self._flushChildren();
});
}
util.inherits(StreamReplay,stream.Writable);
StreamReplay.prototype._write = function (data, enc, cb) {
this._buffer.push([data,enc]);
var self = this;
this._flushChildren();
cb();
}
StreamReplay.prototype._flushChildren = function (child) {
var self = this;
this._children.forEach(function (child){
if (child.flushing) return;
child.flushing = true;
self._sendBufferToChild(child);
});
}
StreamReplay.prototype._sendBufferToChild = function (child) {
if (child.bufferLoc >= this._buffer.length) {
if (this.finished && child.options.end) {
child.dest.end();
}
return child.flushing = false;
}
var chunk = this._buffer[child.bufferLoc++];
var self = this;
child.dest.write(chunk[0],chunk[1] == 'buffer' ? null : chunk[1])
? this._sendBufferToChild(child)
: child.drain = child.dest.once('drain',function() { self._sendBufferToChild(child) });
}
StreamReplay.prototype.spawn = function () {
var out = new stream.PassThrough();
this.pipe(out);
var self = this;
out.once('end',function(){ self.unpipe(out) });
return out;
}
var childId = 0;
StreamReplay.prototype.pipe = function (pipeto,options) {
if (!options) options = {};
if (options.end == null) options.end = true;
if (options.sendBuffer == null) options.sendBuffer = true;
var child = {id:++childId, bufferLoc: options.sendBuffer ? 0: this._buffer.length, dest: pipeto, options: options};
this._children.push(child);
this._flushChildren();
return pipeto;
}
StreamReplay.prototype.unpipe = function (pipeto) {
if (pipeto == null) {
this._children.forEach(function(child) {
if (!child.drain) return;
child.removeListener('drain',child.drain);
});
this._children = [];
return;
}
var child = this._children.filter(function(pc){ return pc.dest === pipeto });
this._children = this._children.filter(function(pc){ return pc !== child });
if (child.drain) {
child.removeListener('drain',child.drain);
}
}
StreamReplay.prototype.piped = function () {
return this._children.length ? true : false;
}