Blame view

node_modules/asynckit/lib/streamify.js 2.89 KB
f7563de62   Palak Handa   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
  var async = require('./async.js');
  
  // API
  module.exports = {
    iterator: wrapIterator,
    callback: wrapCallback
  };
  
  /**
   * Wraps iterators with long signature
   *
   * @this    ReadableAsyncKit#
   * @param   {function} iterator - function to wrap
   * @returns {function} - wrapped function
   */
  function wrapIterator(iterator)
  {
    var stream = this;
  
    return function(item, key, cb)
    {
      var aborter
        , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
        ;
  
      stream.jobs[key] = wrappedCb;
  
      // it's either shortcut (item, cb)
      if (iterator.length == 2)
      {
        aborter = iterator(item, wrappedCb);
      }
      // or long format (item, key, cb)
      else
      {
        aborter = iterator(item, key, wrappedCb);
      }
  
      return aborter;
    };
  }
  
  /**
   * Wraps provided callback function
   * allowing to execute snitch function before
   * real callback
   *
   * @this    ReadableAsyncKit#
   * @param   {function} callback - function to wrap
   * @returns {function} - wrapped function
   */
  function wrapCallback(callback)
  {
    var stream = this;
  
    var wrapped = function(error, result)
    {
      return finisher.call(stream, error, result, callback);
    };
  
    return wrapped;
  }
  
  /**
   * Wraps provided iterator callback function
   * makes sure snitch only called once,
   * but passes secondary calls to the original callback
   *
   * @this    ReadableAsyncKit#
   * @param   {function} callback - callback to wrap
   * @param   {number|string} key - iteration key
   * @returns {function} wrapped callback
   */
  function wrapIteratorCallback(callback, key)
  {
    var stream = this;
  
    return function(error, output)
    {
      // don't repeat yourself
      if (!(key in stream.jobs))
      {
        callback(error, output);
        return;
      }
  
      // clean up jobs
      delete stream.jobs[key];
  
      return streamer.call(stream, error, {key: key, value: output}, callback);
    };
  }
  
  /**
   * Stream wrapper for iterator callback
   *
   * @this  ReadableAsyncKit#
   * @param {mixed} error - error response
   * @param {mixed} output - iterator output
   * @param {function} callback - callback that expects iterator results
   */
  function streamer(error, output, callback)
  {
    if (error && !this.error)
    {
      this.error = error;
      this.pause();
      this.emit('error', error);
      // send back value only, as expected
      callback(error, output && output.value);
      return;
    }
  
    // stream stuff
    this.push(output);
  
    // back to original track
    // send back value only, as expected
    callback(error, output && output.value);
  }
  
  /**
   * Stream wrapper for finishing callback
   *
   * @this  ReadableAsyncKit#
   * @param {mixed} error - error response
   * @param {mixed} output - iterator output
   * @param {function} callback - callback that expects final results
   */
  function finisher(error, output, callback)
  {
    // signal end of the stream
    // only for successfully finished streams
    if (!error)
    {
      this.push(null);
    }
  
    // back to original track
    callback(error, output);
  }