-
Notifications
You must be signed in to change notification settings - Fork 13
/
pull-weird.js
100 lines (89 loc) · 1.96 KB
/
pull-weird.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
'use strict'
const pull = require('pull-stream')
// wrap pull streams around packet-stream's weird streams.
function once (fn) {
let done = false
return (err, val) => {
if (done) return
done = true
fn(err, val)
}
}
function duplex (weird, _done) {
const buffer = []
let ended = false
let aborted = false
let waiting
let abort
const done = once((err, v) => {
if (_done) _done(err, v)
// deallocate
weird = null
_done = null
waiting = null
if (abort) abort(err || true, () => {})
})
weird.read = function (data, end) {
ended = ended || end
if (waiting) {
const cb = waiting
waiting = null
cb(ended, data)
} else if (!ended && !aborted) {
buffer.push(data)
}
if (ended) {
done(ended !== true ? ended : null)
}
}
return {
source (abort, cb) {
if (abort) {
if (weird) weird.write(null, abort)
cb(abort)
buffer.length = 0
aborted = true
done(abort !== true ? abort : null)
} else if (buffer.length) {
cb(null, buffer.shift())
} else if (ended) {
cb(ended)
} else {
waiting = cb
}
},
sink (read) {
if (ended) {
abort = null
return read(ended, () => {})
}
abort = read
pull.drain((data) => {
// TODO: make this should only happen on a UNIPLEX stream.
if (ended) return false
weird.write(data)
}, (err) => {
if (weird && !weird.writeEnd) {
weird.write(null, err || true)
}
if (done) done(err)
})(read)
}
}
}
function uniplex (s, done) {
return duplex(s, (err) => {
if (!s.writeEnd) s.write(null, err || true)
if (done) done(err)
})
}
function source (s) {
return uniplex(s).source
}
function sink (s, done) {
return uniplex(s, done).sink
}
module.exports = duplex
module.exports.source = source
module.exports.sink = sink
module.exports.duplex = duplex