seq-queue.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. var EventEmitter = require('events').EventEmitter;
  2. var util = require('util');
  3. var DEFAULT_TIMEOUT = 3000;
  4. var INIT_ID = 0;
  5. var EVENT_CLOSED = 'closed';
  6. var EVENT_DRAINED = 'drained';
  7. /**
  8. * Instance a new queue
  9. *
  10. * @param {Number} timeout a global timeout for new queue
  11. * @class
  12. * @constructor
  13. */
  14. var SeqQueue = function(timeout) {
  15. EventEmitter.call(this);
  16. if(timeout && timeout > 0) {
  17. this.timeout = timeout;
  18. } else {
  19. this.timeout = DEFAULT_TIMEOUT;
  20. }
  21. this.status = SeqQueueManager.STATUS_IDLE;
  22. this.curId = INIT_ID;
  23. this.queue = [];
  24. };
  25. util.inherits(SeqQueue, EventEmitter);
  26. /**
  27. * Add a task into queue.
  28. *
  29. * @param fn new request
  30. * @param ontimeout callback when task timeout
  31. * @param timeout timeout for current request. take the global timeout if this is invalid
  32. * @returns true or false
  33. */
  34. SeqQueue.prototype.push = function(fn, ontimeout, timeout) {
  35. if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
  36. //ignore invalid status
  37. return false;
  38. }
  39. if(typeof fn !== 'function') {
  40. throw new Error('fn should be a function.');
  41. }
  42. this.queue.push({fn: fn, ontimeout: ontimeout, timeout: timeout});
  43. if(this.status === SeqQueueManager.STATUS_IDLE) {
  44. this.status = SeqQueueManager.STATUS_BUSY;
  45. var self = this;
  46. process.nextTick(function() {
  47. self._next(self.curId);
  48. });
  49. }
  50. return true;
  51. };
  52. /**
  53. * Close queue
  54. *
  55. * @param {Boolean} force if true will close the queue immediately else will execute the rest task in queue
  56. */
  57. SeqQueue.prototype.close = function(force) {
  58. if(this.status !== SeqQueueManager.STATUS_IDLE && this.status !== SeqQueueManager.STATUS_BUSY) {
  59. //ignore invalid status
  60. return;
  61. }
  62. if(force) {
  63. this.status = SeqQueueManager.STATUS_DRAINED;
  64. if(this.timerId) {
  65. clearTimeout(this.timerId);
  66. this.timerId = undefined;
  67. }
  68. this.emit(EVENT_DRAINED);
  69. } else {
  70. this.status = SeqQueueManager.STATUS_CLOSED;
  71. this.emit(EVENT_CLOSED);
  72. }
  73. };
  74. /**
  75. * Invoke next task
  76. *
  77. * @param {String|Number} tid last executed task id
  78. * @api private
  79. */
  80. SeqQueue.prototype._next = function(tid) {
  81. if(tid !== this.curId || this.status !== SeqQueueManager.STATUS_BUSY && this.status !== SeqQueueManager.STATUS_CLOSED) {
  82. //ignore invalid next call
  83. return;
  84. }
  85. if(this.timerId) {
  86. clearTimeout(this.timerId);
  87. this.timerId = undefined;
  88. }
  89. var task = this.queue.shift();
  90. if(!task) {
  91. if(this.status === SeqQueueManager.STATUS_BUSY) {
  92. this.status = SeqQueueManager.STATUS_IDLE;
  93. this.curId++; //modify curId to invalidate timeout task
  94. } else {
  95. this.status = SeqQueueManager.STATUS_DRAINED;
  96. this.emit(EVENT_DRAINED);
  97. }
  98. return;
  99. }
  100. var self = this;
  101. task.id = ++this.curId;
  102. var timeout = task.timeout > 0 ? task.timeout : this.timeout;
  103. timeout = timeout > 0 ? timeout : DEFAULT_TIMEOUT;
  104. this.timerId = setTimeout(function() {
  105. process.nextTick(function() {
  106. self._next(task.id);
  107. });
  108. self.emit('timeout', task);
  109. if(task.ontimeout) {
  110. task.ontimeout();
  111. }
  112. }, timeout);
  113. try {
  114. task.fn({
  115. done: function() {
  116. var res = task.id === self.curId;
  117. process.nextTick(function() {
  118. self._next(task.id);
  119. });
  120. return res;
  121. }
  122. });
  123. } catch(err) {
  124. self.emit('error', err, task);
  125. process.nextTick(function() {
  126. self._next(task.id);
  127. });
  128. }
  129. };
  130. /**
  131. * Queue manager.
  132. *
  133. * @module
  134. */
  135. var SeqQueueManager = module.exports;
  136. /**
  137. * Queue status: idle, welcome new tasks
  138. *
  139. * @const
  140. * @type {Number}
  141. * @memberOf SeqQueueManager
  142. */
  143. SeqQueueManager.STATUS_IDLE = 0;
  144. /**
  145. * Queue status: busy, queue is working for some tasks now
  146. *
  147. * @const
  148. * @type {Number}
  149. * @memberOf SeqQueueManager
  150. */
  151. SeqQueueManager.STATUS_BUSY = 1;
  152. /**
  153. * Queue status: closed, queue has closed and would not receive task any more
  154. * and is processing the remaining tasks now.
  155. *
  156. * @const
  157. * @type {Number}
  158. * @memberOf SeqQueueManager
  159. */
  160. SeqQueueManager.STATUS_CLOSED = 2;
  161. /**
  162. * Queue status: drained, queue is ready to be destroy
  163. *
  164. * @const
  165. * @type {Number}
  166. * @memberOf SeqQueueManager
  167. */
  168. SeqQueueManager.STATUS_DRAINED = 3;
  169. /**
  170. * Create Sequence queue
  171. *
  172. * @param {Number} timeout a global timeout for the new queue instance
  173. * @return {Object} new queue instance
  174. * @memberOf SeqQueueManager
  175. */
  176. SeqQueueManager.createQueue = function(timeout) {
  177. return new SeqQueue(timeout);
  178. };