123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- var EventEmitter = require('events').EventEmitter;
- var util = require('util');
- var DEFAULT_TIMEOUT = 3000;
- var INIT_ID = 0;
- var EVENT_CLOSED = 'closed';
- var EVENT_DRAINED = 'drained';
- /**
- * Instance a new queue
- *
- * @param {Number} timeout a global timeout for new queue
- * @class
- * @constructor
- */
- var SeqQueue = function(timeout) {
- EventEmitter.call(this);
-
- if(timeout && timeout > 0) {
- this.timeout = timeout;
- } else {
- this.timeout = DEFAULT_TIMEOUT;
- }
-
- this.status = SeqQueueManager.STATUS_IDLE;
- this.curId = INIT_ID;
- this.queue = [];
- };
- util.inherits(SeqQueue, EventEmitter);
- /**
- * Add a task into queue.
- *
- * @param fn new request
- * @param ontimeout callback when task timeout
- * @param timeout timeout for current request. take the global timeout if this is invalid
- * @returns true or false
- */
- SeqQueue.prototype.push = function(fn, ontimeout, timeout) {
- if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
- //ignore invalid status
- return false;
- }
-
- if(typeof fn !== 'function') {
- throw new Error('fn should be a function.');
- }
- this.queue.push({fn: fn, ontimeout: ontimeout, timeout: timeout});
- if(this.status === SeqQueueManager.STATUS_IDLE) {
- this.status = SeqQueueManager.STATUS_BUSY;
- var self = this;
- process.nextTick(function() {
- self._next(self.curId);
- });
- }
- return true;
- };
- /**
- * Close queue
- *
- * @param {Boolean} force if true will close the queue immediately else will execute the rest task in queue
- */
- SeqQueue.prototype.close = function(force) {
- if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
- //ignore invalid status
- return;
- }
-
- if(force) {
- this.status = SeqQueueManager.STATUS_DRAINED;
- if(this.timerId) {
- clearTimeout(this.timerId);
- this.timerId = undefined;
- }
- this.emit(EVENT_DRAINED);
- } else {
- this.status = SeqQueueManager.STATUS_CLOSED;
- this.emit(EVENT_CLOSED);
- }
- };
- /**
- * Invoke next task
- *
- * @param {String|Number} tid last executed task id
- * @api private
- */
- SeqQueue.prototype._next = function(tid) {
- if(tid !== this.curId || this.status !== SeqQueueManager.STATUS_BUSY && this.status !== SeqQueueManager.STATUS_CLOSED) {
- //ignore invalid next call
- return;
- }
-
- if(this.timerId) {
- clearTimeout(this.timerId);
- this.timerId = undefined;
- }
-
- var task = this.queue.shift();
- if(!task) {
- if(this.status === SeqQueueManager.STATUS_BUSY) {
- this.status = SeqQueueManager.STATUS_IDLE;
- this.curId++; //modify curId to invalidate timeout task
- } else {
- this.status = SeqQueueManager.STATUS_DRAINED;
- this.emit(EVENT_DRAINED);
- }
- return;
- }
-
- var self = this;
- task.id = ++this.curId;
- var timeout = task.timeout > 0 ? task.timeout : this.timeout;
- timeout = timeout > 0 ? timeout : DEFAULT_TIMEOUT;
- this.timerId = setTimeout(function() {
- process.nextTick(function() {
- self._next(task.id);
- });
- self.emit('timeout', task);
- if(task.ontimeout) {
- task.ontimeout();
- }
- }, timeout);
- try {
- task.fn({
- done: function() {
- var res = task.id === self.curId;
- process.nextTick(function() {
- self._next(task.id);
- });
- return res;
- }
- });
- } catch(err) {
- self.emit('error', err, task);
- process.nextTick(function() {
- self._next(task.id);
- });
- }
- };
- /**
- * Queue manager.
- *
- * @module
- */
- var SeqQueueManager = module.exports;
- /**
- * Queue status: idle, welcome new tasks
- *
- * @const
- * @type {Number}
- * @memberOf SeqQueueManager
- */
- SeqQueueManager.STATUS_IDLE = 0;
- /**
- * Queue status: busy, queue is working for some tasks now
- *
- * @const
- * @type {Number}
- * @memberOf SeqQueueManager
- */
- SeqQueueManager.STATUS_BUSY = 1;
- /**
- * Queue status: closed, queue has closed and would not receive task any more
- * and is processing the remaining tasks now.
- *
- * @const
- * @type {Number}
- * @memberOf SeqQueueManager
- */
- SeqQueueManager.STATUS_CLOSED = 2;
- /**
- * Queue status: drained, queue is ready to be destroy
- *
- * @const
- * @type {Number}
- * @memberOf SeqQueueManager
- */
- SeqQueueManager.STATUS_DRAINED = 3;
- /**
- * Create Sequence queue
- *
- * @param {Number} timeout a global timeout for the new queue instance
- * @return {Object} new queue instance
- * @memberOf SeqQueueManager
- */
- SeqQueueManager.createQueue = function(timeout) {
- return new SeqQueue(timeout);
- };
|