Blame view

node_modules/nodemailer-direct-transport/lib/message-queue.js 3.13 KB
f7563de62   Palak Handa   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
  'use strict';
  
  // expose to the world
  module.exports = function () {
      return new MessageQueue();
  };
  
  /**
   * Creates a queue object
   *
   * @constructor
   */
  function MessageQueue() {
      this._instantQueue = [];
      this._sortedQueue = [];
      this._shiftTimer = null;
      this._callbackQueue = [];
  }
  
  /**
   * Sets a callback to be run when something comes available from the queue
   *
   * @param {Function} callback Callback function to run with queue element as an argument
   */
  MessageQueue.prototype.get = function (callback) {
      if (this._instantQueue.length) {
          return callback(this._instantQueue.pop());
      } else {
          this._callbackQueue.unshift(callback);
      }
  };
  
  /**
   * Adds an element to the queue. If delay (ms) is set, the data will not be available before
   * specified delay has passed. Otherwise the data will be available for processing immediatelly.
   *
   * @param {Mixed} data Value to be queued
   * @param {Number} [delay] If set, delay the availability of the data by {delay} milliseconds
   */
  MessageQueue.prototype.insert = function (data, delay) {
      var container, added = -1;
      if (typeof delay !== 'number') {
          this._instantQueue.unshift(data);
          this._processInsert();
          return true;
      } else {
          container = {
              data: data,
              available: Date.now() + delay
          };
          for (var i = 0, len = this._sortedQueue.length; i < len; i++) {
              if (this._sortedQueue[i].available >= container.available) {
                  this._sortedQueue.splice(i, 0, container);
                  added = i;
                  break;
              }
          }
          if (added < 0) {
              this._sortedQueue.push(container);
              added = 0;
          }
  
          if (added === 0) {
              this._updateShiftTimer();
          }
      }
  };
  
  /**
   * Clears previous timer and creates a new one (if needed) to process the element
   * in the queue that needs to be processed first.
   */
  MessageQueue.prototype._updateShiftTimer = function () {
      var nextShift, now = Date.now();
      clearTimeout(this._shiftTimer);
  
      if (!this._sortedQueue.length) {
          return;
      }
  
      nextShift = this._sortedQueue[0].available;
  
      if (nextShift <= now) {
          this._shiftSorted();
      } else {
          setTimeout(this._shiftSorted.bind(this),
              // add +15ms to ensure that data is already available when the timer is fired
              this._sortedQueue[0].available - Date.now() + 15);
      }
  };
  
  /**
   * Moves an element from the delayed queue to the immediate queue if an elmenet
   * becomes avilable
   */
  MessageQueue.prototype._shiftSorted = function () {
      var container;
      if (!this._sortedQueue.length) {
          return;
      }
  
      if (this._sortedQueue[0].available <= Date.now()) {
          container = this._sortedQueue.shift();
          this.insert(container.data);
      }
  
      this._updateShiftTimer();
  };
  
  /**
   * If data from a queue is available and a callback is set, run the callback
   * with available data
   */
  MessageQueue.prototype._processInsert = function () {
      if (this._instantQueue.length && this._callbackQueue.length) {
          this._callbackQueue.pop()(this._instantQueue.pop());
      }
  };