Blame view

node_modules/through/index.js 2.56 KB
f7563de62   Palak Handa   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
  var Stream = require('stream')
  
  // through
  //
  // a stream that does nothing but re-emit the input.
  // useful for aggregating a series of changing but not ending streams into one stream)
  
  exports = module.exports = through
  through.through = through
  
  //create a readable writable stream.
  
  function through (write, end, opts) {
    write = write || function (data) { this.queue(data) }
    end = end || function () { this.queue(null) }
  
    var ended = false, destroyed = false, buffer = [], _ended = false
    var stream = new Stream()
    stream.readable = stream.writable = true
    stream.paused = false
  
  //  stream.autoPause   = !(opts && opts.autoPause   === false)
    stream.autoDestroy = !(opts && opts.autoDestroy === false)
  
    stream.write = function (data) {
      write.call(this, data)
      return !stream.paused
    }
  
    function drain() {
      while(buffer.length && !stream.paused) {
        var data = buffer.shift()
        if(null === data)
          return stream.emit('end')
        else
          stream.emit('data', data)
      }
    }
  
    stream.queue = stream.push = function (data) {
  //    console.error(ended)
      if(_ended) return stream
      if(data === null) _ended = true
      buffer.push(data)
      drain()
      return stream
    }
  
    //this will be registered as the first 'end' listener
    //must call destroy next tick, to make sure we're after any
    //stream piped from here.
    //this is only a problem if end is not emitted synchronously.
    //a nicer way to do this is to make sure this is the last listener for 'end'
  
    stream.on('end', function () {
      stream.readable = false
      if(!stream.writable && stream.autoDestroy)
        process.nextTick(function () {
          stream.destroy()
        })
    })
  
    function _end () {
      stream.writable = false
      end.call(stream)
      if(!stream.readable && stream.autoDestroy)
        stream.destroy()
    }
  
    stream.end = function (data) {
      if(ended) return
      ended = true
      if(arguments.length) stream.write(data)
      _end() // will emit or queue
      return stream
    }
  
    stream.destroy = function () {
      if(destroyed) return
      destroyed = true
      ended = true
      buffer.length = 0
      stream.writable = stream.readable = false
      stream.emit('close')
      return stream
    }
  
    stream.pause = function () {
      if(stream.paused) return
      stream.paused = true
      return stream
    }
  
    stream.resume = function () {
      if(stream.paused) {
        stream.paused = false
        stream.emit('resume')
      }
      drain()
      //may have become paused again,
      //as drain emits 'data'.
      if(!stream.paused)
        stream.emit('drain')
      return stream
    }
    return stream
  }