Blame view

node_modules/duplex/index.js 3.3 KB
f7563de62   Palak Handa   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
  var Stream = require('stream')
  
  module.exports = function (write, end) {
    var stream = new Stream() 
    var buffer = [], ended = false, destroyed = false, emitEnd
    stream.writable = stream.readable = true
    stream.paused = false
    stream._paused = false
    stream.buffer = buffer
    
    stream
      .on('pause', function () {
        stream._paused = true
      })
      .on('drain', function () {
        stream._paused = false
      })
     
    function destroySoon () {
      process.nextTick(stream.destroy.bind(stream))
    }
  
    if(write)
      stream.on('_data', write)
    if(end)
      stream.on('_end', end)
  
    //destroy the stream once both ends are over
    //but do it in nextTick, so that other listeners
    //on end have time to respond
    stream.once('end', function () { 
      stream.readable = false
      if(!stream.writable) {
        process.nextTick(function () {
          stream.destroy()
        })
      }
    })
  
    stream.once('_end', function () { 
      stream.writable = false
      if(!stream.readable)
        stream.destroy()
    })
  
    // this is the default write method,
    // if you overide it, you are resposible
    // for pause state.
  
    
    stream._data = function (data) {
      if(!stream.paused && !buffer.length)
        stream.emit('data', data)
      else 
        buffer.push(data)
      return !(stream.paused || buffer.length)
    }
  
    stream._end = function (data) { 
      if(data) stream._data(data)
      if(emitEnd) return
      emitEnd = true
      //destroy is handled above.
      stream.drain()
    }
  
    stream.write = function (data) {
      stream.emit('_data', data)
      return !stream._paused
    }
  
    stream.end = function () {
      stream.writable = false
      if(stream.ended) return
      stream.ended = true
      stream.emit('_end')
    }
  
    stream.drain = function () {
      if(!buffer.length && !emitEnd) return
      //if the stream is paused after just before emitEnd()
      //end should be buffered.
      while(!stream.paused) {
        if(buffer.length) {
          stream.emit('data', buffer.shift())
          if(buffer.length == 0) {
            stream.emit('_drain')
          }
        }
        else if(emitEnd && stream.readable) {
          stream.readable = false
          stream.emit('end')
          return
        } else {
          //if the buffer has emptied. emit drain.
          return true
        }
      }
    }
    var started = false
    stream.resume = function () {
      //this is where I need pauseRead, and pauseWrite.
      //here the reading side is unpaused,
      //but the writing side may still be paused.
      //the whole buffer might not empity at once.
      //it might pause again.
      //the stream should never emit data inbetween pause()...resume()
      //and write should return !buffer.length
      started = true
      stream.paused = false
      stream.drain() //will emit drain if buffer empties.
      return stream
    }
  
    stream.destroy = function () {
      if(destroyed) return
      destroyed = ended = true     
      buffer.length = 0
      stream.emit('close')
    }
    var pauseCalled = false
    stream.pause = function () {
      started = true
      stream.paused = true
      stream.emit('_pause')
      return stream
    }
    stream._pause = function () {
      if(!stream._paused) {
        stream._paused = true
        stream.emit('pause')
      }
      return this
    }
    stream.paused = true
    process.nextTick(function () {
      //unless the user manually paused
      if(started) return
      stream.resume()
    })
   
    return stream
  }