querystream.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*!
  2. * Module dependencies.
  3. */
  4. var Stream = require('stream').Stream
  5. var utils = require('./utils')
  6. var helpers = require('./queryhelpers')
  7. var K = function(k){ return k }
  8. /**
  9. * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
  10. *
  11. * var stream = Model.find().stream();
  12. *
  13. * stream.on('data', function (doc) {
  14. * // do something with the mongoose document
  15. * }).on('error', function (err) {
  16. * // handle the error
  17. * }).on('close', function () {
  18. * // the stream is closed
  19. * });
  20. *
  21. *
  22. * The stream interface allows us to simply "plug-in" to other _Node.js 0.8_ style write streams.
  23. *
  24. * Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
  25. *
  26. * ####Valid options
  27. *
  28. * - `transform`: optional function which accepts a mongoose document. The return value of the function will be emitted on `data`.
  29. *
  30. * ####Example
  31. *
  32. * // JSON.stringify all documents before emitting
  33. * var stream = Thing.find().stream({ transform: JSON.stringify });
  34. * stream.pipe(writeStream);
  35. *
  36. * _NOTE: plugging into an HTTP response will *not* work out of the box. Those streams expect only strings or buffers to be emitted, so first formatting our documents as strings/buffers is necessary._
  37. *
  38. * _NOTE: these streams are Node.js 0.8 style read streams which differ from Node.js 0.10 style. Node.js 0.10 streams are not well tested yet and are not guaranteed to work._
  39. *
  40. * @param {Query} query
  41. * @param {Object} [options]
  42. * @inherits NodeJS Stream http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream
  43. * @event `data`: emits a single Mongoose document
  44. * @event `error`: emits when an error occurs during streaming. This will emit _before_ the `close` event.
  45. * @event `close`: emits when the stream reaches the end of the cursor or an error occurs, or the stream is manually `destroy`ed. After this event, no more events are emitted.
  46. * @api public
  47. */
  48. function QueryStream (query, options) {
  49. Stream.call(this);
  50. this.query = query;
  51. this.readable = true;
  52. this.paused = false;
  53. this._cursor = null;
  54. this._destroyed = null;
  55. this._fields = null;
  56. this._buffer = null;
  57. this._inline = T_INIT;
  58. this._running = false;
  59. this._transform = options && 'function' == typeof options.transform
  60. ? options.transform
  61. : K;
  62. // give time to hook up events
  63. var self = this;
  64. process.nextTick(function () {
  65. self._init();
  66. });
  67. }
  68. /*!
  69. * Inherit from Stream
  70. */
  71. QueryStream.prototype.__proto__ = Stream.prototype;
  72. /**
  73. * Flag stating whether or not this stream is readable.
  74. *
  75. * @property readable
  76. * @api public
  77. */
  78. QueryStream.prototype.readable;
  79. /**
  80. * Flag stating whether or not this stream is paused.
  81. *
  82. * @property paused
  83. * @api public
  84. */
  85. QueryStream.prototype.paused;
  86. // trampoline flags
  87. var T_INIT = 0;
  88. var T_IDLE = 1;
  89. var T_CONT = 2;
  90. /**
  91. * Initializes the query.
  92. *
  93. * @api private
  94. */
  95. QueryStream.prototype._init = function () {
  96. if (this._destroyed) return;
  97. var query = this.query
  98. , model = query.model
  99. , options = query._optionsForExec(model)
  100. , self = this
  101. try {
  102. query.cast(model);
  103. } catch (err) {
  104. return self.destroy(err);
  105. }
  106. self._fields = utils.clone(query._fields);
  107. options.fields = query._castFields(self._fields);
  108. model.collection.find(query._conditions, options, function (err, cursor) {
  109. if (err) return self.destroy(err);
  110. self._cursor = cursor;
  111. self._next();
  112. });
  113. }
  114. /**
  115. * Trampoline for pulling the next doc from cursor.
  116. *
  117. * @see QueryStream#__next #querystream_QueryStream-__next
  118. * @api private
  119. */
  120. QueryStream.prototype._next = function _next () {
  121. if (this.paused || this._destroyed) {
  122. return this._running = false;
  123. }
  124. this._running = true;
  125. if (this._buffer && this._buffer.length) {
  126. var arg;
  127. while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) {
  128. this._onNextObject.apply(this, arg);
  129. }
  130. }
  131. // avoid stack overflows with large result sets.
  132. // trampoline instead of recursion.
  133. while (this.__next()) {}
  134. }
  135. /**
  136. * Pulls the next doc from the cursor.
  137. *
  138. * @see QueryStream#_next #querystream_QueryStream-_next
  139. * @api private
  140. */
  141. QueryStream.prototype.__next = function () {
  142. if (this.paused || this._destroyed)
  143. return this._running = false;
  144. var self = this;
  145. self._inline = T_INIT;
  146. self._cursor.nextObject(function cursorcb (err, doc) {
  147. self._onNextObject(err, doc);
  148. });
  149. // if onNextObject() was already called in this tick
  150. // return ourselves to the trampoline.
  151. if (T_CONT === this._inline) {
  152. return true;
  153. } else {
  154. // onNextObject() hasn't fired yet. tell onNextObject
  155. // that its ok to call _next b/c we are not within
  156. // the trampoline anymore.
  157. this._inline = T_IDLE;
  158. }
  159. }
  160. /**
  161. * Transforms raw `doc`s returned from the cursor into a model instance.
  162. *
  163. * @param {Error|null} err
  164. * @param {Object} doc
  165. * @api private
  166. */
  167. QueryStream.prototype._onNextObject = function _onNextObject (err, doc) {
  168. if (this._destroyed) return;
  169. if (this.paused) {
  170. this._buffer || (this._buffer = []);
  171. this._buffer.push([err, doc]);
  172. return this._running = false;
  173. }
  174. if (err) return this.destroy(err);
  175. // when doc is null we hit the end of the cursor
  176. if (!doc) {
  177. this.emit('end');
  178. return this.destroy();
  179. }
  180. var opts = this.query._mongooseOptions;
  181. if (!opts.populate) {
  182. return true === opts.lean ?
  183. emit(this, doc) :
  184. createAndEmit(this, null, doc);
  185. }
  186. var self = this;
  187. var pop = helpers.preparePopulationOptionsMQ(self.query, self.query._mongooseOptions);
  188. self.query.model.populate(doc, pop, function (err, doc) {
  189. if (err) return self.destroy(err);
  190. return true === opts.lean ?
  191. emit(self, doc) :
  192. createAndEmit(self, pop, doc);
  193. });
  194. }
  195. function createAndEmit (self, populatedIds, doc) {
  196. var instance = helpers.createModel(self.query.model, doc, self._fields);
  197. var opts = populatedIds ?
  198. { populated: populatedIds } :
  199. undefined;
  200. instance.init(doc, opts, function (err) {
  201. if (err) return self.destroy(err);
  202. emit(self, instance);
  203. });
  204. }
  205. /*!
  206. * Emit a data event and manage the trampoline state
  207. */
  208. function emit (self, doc) {
  209. self.emit('data', self._transform(doc));
  210. // trampoline management
  211. if (T_IDLE === self._inline) {
  212. // no longer in trampoline. restart it.
  213. self._next();
  214. } else {
  215. // in a trampoline. tell __next that its
  216. // ok to continue jumping.
  217. self._inline = T_CONT;
  218. }
  219. }
  220. /**
  221. * Pauses this stream.
  222. *
  223. * @api public
  224. */
  225. QueryStream.prototype.pause = function () {
  226. this.paused = true;
  227. }
  228. /**
  229. * Resumes this stream.
  230. *
  231. * @api public
  232. */
  233. QueryStream.prototype.resume = function () {
  234. this.paused = false;
  235. if (!this._cursor) {
  236. // cannot start if not initialized
  237. return;
  238. }
  239. // are we within the trampoline?
  240. if (T_INIT === this._inline) {
  241. return;
  242. }
  243. if (!this._running) {
  244. // outside QueryStream control, need manual restart
  245. return this._next();
  246. }
  247. }
  248. /**
  249. * Destroys the stream, closing the underlying cursor. No more events will be emitted.
  250. *
  251. * @param {Error} [err]
  252. * @api public
  253. */
  254. QueryStream.prototype.destroy = function (err) {
  255. if (this._destroyed) return;
  256. this._destroyed = true;
  257. this._running = false;
  258. this.readable = false;
  259. if (this._cursor) {
  260. this._cursor.close();
  261. }
  262. if (err) {
  263. this.emit('error', err);
  264. }
  265. this.emit('close');
  266. }
  267. /**
  268. * Pipes this query stream into another stream. This method is inherited from NodeJS Streams.
  269. *
  270. * ####Example:
  271. *
  272. * query.stream().pipe(writeStream [, options])
  273. *
  274. * @method pipe
  275. * @memberOf QueryStream
  276. * @see NodeJS http://nodejs.org/api/stream.html
  277. * @api public
  278. */
  279. /*!
  280. * Module exports
  281. */
  282. module.exports = exports = QueryStream;