index.js
3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
var Stream = require('stream')
module.exports = function (write, end) {
var stream = new Stream()
var buffer = [], ended = false, destroyed = false, emitEnd
stream.writable = stream.readable = true
stream.paused = false
stream._paused = false
stream.buffer = buffer
stream
.on('pause', function () {
stream._paused = true
})
.on('drain', function () {
stream._paused = false
})
function destroySoon () {
process.nextTick(stream.destroy.bind(stream))
}
if(write)
stream.on('_data', write)
if(end)
stream.on('_end', end)
//destroy the stream once both ends are over
//but do it in nextTick, so that other listeners
//on end have time to respond
stream.once('end', function () {
stream.readable = false
if(!stream.writable) {
process.nextTick(function () {
stream.destroy()
})
}
})
stream.once('_end', function () {
stream.writable = false
if(!stream.readable)
stream.destroy()
})
// this is the default write method,
// if you overide it, you are resposible
// for pause state.
stream._data = function (data) {
if(!stream.paused && !buffer.length)
stream.emit('data', data)
else
buffer.push(data)
return !(stream.paused || buffer.length)
}
stream._end = function (data) {
if(data) stream._data(data)
if(emitEnd) return
emitEnd = true
//destroy is handled above.
stream.drain()
}
stream.write = function (data) {
stream.emit('_data', data)
return !stream._paused
}
stream.end = function () {
stream.writable = false
if(stream.ended) return
stream.ended = true
stream.emit('_end')
}
stream.drain = function () {
if(!buffer.length && !emitEnd) return
//if the stream is paused after just before emitEnd()
//end should be buffered.
while(!stream.paused) {
if(buffer.length) {
stream.emit('data', buffer.shift())
if(buffer.length == 0) {
stream.emit('_drain')
}
}
else if(emitEnd && stream.readable) {
stream.readable = false
stream.emit('end')
return
} else {
//if the buffer has emptied. emit drain.
return true
}
}
}
var started = false
stream.resume = function () {
//this is where I need pauseRead, and pauseWrite.
//here the reading side is unpaused,
//but the writing side may still be paused.
//the whole buffer might not empity at once.
//it might pause again.
//the stream should never emit data inbetween pause()...resume()
//and write should return !buffer.length
started = true
stream.paused = false
stream.drain() //will emit drain if buffer empties.
return stream
}
stream.destroy = function () {
if(destroyed) return
destroyed = ended = true
buffer.length = 0
stream.emit('close')
}
var pauseCalled = false
stream.pause = function () {
started = true
stream.paused = true
stream.emit('_pause')
return stream
}
stream._pause = function () {
if(!stream._paused) {
stream._paused = true
stream.emit('pause')
}
return this
}
stream.paused = true
process.nextTick(function () {
//unless the user manually paused
if(started) return
stream.resume()
})
return stream
}