message-queue.js
3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
'use strict';
// expose to the world
module.exports = function () {
return new MessageQueue();
};
/**
* Creates a queue object
*
* @constructor
*/
function MessageQueue() {
this._instantQueue = [];
this._sortedQueue = [];
this._shiftTimer = null;
this._callbackQueue = [];
}
/**
* Sets a callback to be run when something comes available from the queue
*
* @param {Function} callback Callback function to run with queue element as an argument
*/
MessageQueue.prototype.get = function (callback) {
if (this._instantQueue.length) {
return callback(this._instantQueue.pop());
} else {
this._callbackQueue.unshift(callback);
}
};
/**
* Adds an element to the queue. If delay (ms) is set, the data will not be available before
* specified delay has passed. Otherwise the data will be available for processing immediatelly.
*
* @param {Mixed} data Value to be queued
* @param {Number} [delay] If set, delay the availability of the data by {delay} milliseconds
*/
MessageQueue.prototype.insert = function (data, delay) {
var container, added = -1;
if (typeof delay !== 'number') {
this._instantQueue.unshift(data);
this._processInsert();
return true;
} else {
container = {
data: data,
available: Date.now() + delay
};
for (var i = 0, len = this._sortedQueue.length; i < len; i++) {
if (this._sortedQueue[i].available >= container.available) {
this._sortedQueue.splice(i, 0, container);
added = i;
break;
}
}
if (added < 0) {
this._sortedQueue.push(container);
added = 0;
}
if (added === 0) {
this._updateShiftTimer();
}
}
};
/**
* Clears previous timer and creates a new one (if needed) to process the element
* in the queue that needs to be processed first.
*/
MessageQueue.prototype._updateShiftTimer = function () {
var nextShift, now = Date.now();
clearTimeout(this._shiftTimer);
if (!this._sortedQueue.length) {
return;
}
nextShift = this._sortedQueue[0].available;
if (nextShift <= now) {
this._shiftSorted();
} else {
setTimeout(this._shiftSorted.bind(this),
// add +15ms to ensure that data is already available when the timer is fired
this._sortedQueue[0].available - Date.now() + 15);
}
};
/**
* Moves an element from the delayed queue to the immediate queue if an elmenet
* becomes avilable
*/
MessageQueue.prototype._shiftSorted = function () {
var container;
if (!this._sortedQueue.length) {
return;
}
if (this._sortedQueue[0].available <= Date.now()) {
container = this._sortedQueue.shift();
this.insert(container.data);
}
this._updateShiftTimer();
};
/**
* If data from a queue is available and a callback is set, run the callback
* with available data
*/
MessageQueue.prototype._processInsert = function () {
if (this._instantQueue.length && this._callbackQueue.length) {
this._callbackQueue.pop()(this._instantQueue.pop());
}
};