-
Notifications
You must be signed in to change notification settings - Fork 9
/
index.js
86 lines (76 loc) · 1.88 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
module.exports = pullPushable
function pullPushable (separated, onClose) {
if (typeof separated === 'function') {
onClose = separated
separated = false
}
// create a buffer for data
// that have been pushed
// but not yet pulled.
var buffer = []
// a pushable is a source stream
// (abort, cb) => cb(end, data)
//
// when pushable is pulled,
// keep references to abort and cb
// so we can call back after
// .end(end) or .push(data)
var abort, cb
function read (_abort, _cb) {
if (_abort) {
abort = _abort
// if there is already a cb waiting, abort it.
if (cb) callback(abort)
}
cb = _cb
drain()
}
var ended
function end (end) {
ended = ended || end || true
// attempt to drain
drain()
}
function push (data) {
if (ended) return
// if sink already waiting,
// we can call back directly.
if (cb) {
callback(abort, data)
return
}
// otherwise buffer data
buffer.push(data)
}
// Return functions separated from source { push, end, source }
if (separated) {
return { push: push, end: end, source: read, buffer: buffer }
}
// Return normal
read.push = push
read.end = end
read.buffer = buffer
return read
// `drain` calls back to (if any) waiting
// sink with abort, end, or next data.
function drain () {
if (!cb) return
if (abort) callback(abort)
else if (!buffer.length && ended) callback(ended)
else if (buffer.length) callback(null, buffer.shift())
}
// `callback` calls back to waiting sink,
// and removes references to sink cb.
function callback (err, val) {
var _cb = cb
// if error and pushable passed onClose, call it
// the first time this stream ends or errors.
if (err && onClose) {
var c = onClose
onClose = null
c(err === true ? null : err)
}
cb = null
_cb(err, val)
}
}