Blame view
node_modules/nodemailer-direct-transport/lib/message-queue.js
3.13 KB
f7563de62
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
'use strict'; // expose to the world module.exports = function () { return new MessageQueue(); }; /** * Creates a queue object * * @constructor */ function MessageQueue() { this._instantQueue = []; this._sortedQueue = []; this._shiftTimer = null; this._callbackQueue = []; } /** * Sets a callback to be run when something comes available from the queue * * @param {Function} callback Callback function to run with queue element as an argument */ MessageQueue.prototype.get = function (callback) { if (this._instantQueue.length) { return callback(this._instantQueue.pop()); } else { this._callbackQueue.unshift(callback); } }; /** * Adds an element to the queue. If delay (ms) is set, the data will not be available before * specified delay has passed. Otherwise the data will be available for processing immediatelly. * * @param {Mixed} data Value to be queued * @param {Number} [delay] If set, delay the availability of the data by {delay} milliseconds */ MessageQueue.prototype.insert = function (data, delay) { var container, added = -1; if (typeof delay !== 'number') { this._instantQueue.unshift(data); this._processInsert(); return true; } else { container = { data: data, available: Date.now() + delay }; for (var i = 0, len = this._sortedQueue.length; i < len; i++) { if (this._sortedQueue[i].available >= container.available) { this._sortedQueue.splice(i, 0, container); added = i; break; } } if (added < 0) { this._sortedQueue.push(container); added = 0; } if (added === 0) { this._updateShiftTimer(); } } }; /** * Clears previous timer and creates a new one (if needed) to process the element * in the queue that needs to be processed first. */ MessageQueue.prototype._updateShiftTimer = function () { var nextShift, now = Date.now(); clearTimeout(this._shiftTimer); if (!this._sortedQueue.length) { return; } nextShift = this._sortedQueue[0].available; if (nextShift <= now) { this._shiftSorted(); } else { setTimeout(this._shiftSorted.bind(this), // add +15ms to ensure that data is already available when the timer is fired this._sortedQueue[0].available - Date.now() + 15); } }; /** * Moves an element from the delayed queue to the immediate queue if an elmenet * becomes avilable */ MessageQueue.prototype._shiftSorted = function () { var container; if (!this._sortedQueue.length) { return; } if (this._sortedQueue[0].available <= Date.now()) { container = this._sortedQueue.shift(); this.insert(container.data); } this._updateShiftTimer(); }; /** * If data from a queue is available and a callback is set, run the callback * with available data */ MessageQueue.prototype._processInsert = function () { if (this._instantQueue.length && this._callbackQueue.length) { this._callbackQueue.pop()(this._instantQueue.pop()); } }; |