cursor.js 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. "use strict";
  2. var inherits = require('util').inherits
  3. , f = require('util').format
  4. , toError = require('./utils').toError
  5. , getSingleProperty = require('./utils').getSingleProperty
  6. , formattedOrderClause = require('./utils').formattedOrderClause
  7. , handleCallback = require('./utils').handleCallback
  8. , Logger = require('mongodb-core').Logger
  9. , EventEmitter = require('events').EventEmitter
  10. , ReadPreference = require('./read_preference')
  11. , MongoError = require('mongodb-core').MongoError
  12. , Readable = require('stream').Readable || require('readable-stream').Readable
  13. , CoreCursor = require('mongodb-core').Cursor
  14. , Query = require('mongodb-core').Query
  15. , CoreReadPreference = require('mongodb-core').ReadPreference;
  16. /**
  17. * @fileOverview The **Cursor** class is an internal class that embodies a cursor on MongoDB
  18. * allowing for iteration over the results returned from the underlying query. It supports
  19. * one by one document iteration, conversion to an array or can be iterated as a Node 0.10.X
  20. * or higher stream
  21. *
  22. * **CURSORS Cannot directly be instantiated**
  23. * @example
  24. * var MongoClient = require('mongodb').MongoClient,
  25. * test = require('assert');
  26. * // Connection url
  27. * var url = 'mongodb://localhost:27017/test';
  28. * // Connect using MongoClient
  29. * MongoClient.connect(url, function(err, db) {
  30. * // Create a collection we want to drop later
  31. * var col = db.collection('createIndexExample1');
  32. * // Insert a bunch of documents
  33. * col.insert([{a:1, b:1}
  34. * , {a:2, b:2}, {a:3, b:3}
  35. * , {a:4, b:4}], {w:1}, function(err, result) {
  36. * test.equal(null, err);
  37. *
  38. * // Show that duplicate records got dropped
  39. * col.find({}).toArray(function(err, items) {
  40. * test.equal(null, err);
  41. * test.equal(4, items.length);
  42. * db.close();
  43. * });
  44. * });
  45. * });
  46. */
  47. /**
  48. * Namespace provided by the mongodb-core and node.js
  49. * @external CoreCursor
  50. * @external Readable
  51. */
  52. // Flags allowed for cursor
  53. var flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
  54. var fields = ['numberOfRetries', 'tailableRetryInterval'];
  55. /**
  56. * Creates a new Cursor instance (INTERNAL TYPE, do not instantiate directly)
  57. * @class Cursor
  58. * @extends external:CoreCursor
  59. * @extends external:Readable
  60. * @property {string} sortValue Cursor query sort setting.
  61. * @property {boolean} timeout Is Cursor able to time out.
  62. * @property {ReadPreference} readPreference Get cursor ReadPreference.
  63. * @fires Cursor#data
  64. * @fires Cursor#end
  65. * @fires Cursor#close
  66. * @fires Cursor#readable
  67. * @return {Cursor} a Cursor instance.
  68. * @example
  69. * Some example
  70. */
  71. var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
  72. CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
  73. var self = this;
  74. var state = Cursor.INIT;
  75. var streamOptions = {};
  76. // Tailable cursor options
  77. var numberOfRetries = options.numberOfRetries || 5;
  78. var tailableRetryInterval = options.tailableRetryInterval || 500;
  79. var currentNumberOfRetries = numberOfRetries;
  80. // Set up
  81. Readable.call(this, {objectMode: true});
  82. // Internal cursor state
  83. this.s = {
  84. // Tailable cursor options
  85. numberOfRetries: numberOfRetries
  86. , tailableRetryInterval: tailableRetryInterval
  87. , currentNumberOfRetries: currentNumberOfRetries
  88. // State
  89. , state: state
  90. // Stream options
  91. , streamOptions: streamOptions
  92. // BSON
  93. , bson: bson
  94. // Namespace
  95. , ns: ns
  96. // Command
  97. , cmd: cmd
  98. // Options
  99. , options: options
  100. // Topology
  101. , topology: topology
  102. // Topology options
  103. , topologyOptions: topologyOptions
  104. }
  105. // Legacy fields
  106. this.timeout = self.s.options.noCursorTimeout == true;
  107. this.sortValue = self.s.cmd.sort;
  108. this.readPreference = self.s.options.readPreference;
  109. }
  110. /**
  111. * Cursor stream data event, fired for each document in the cursor.
  112. *
  113. * @event Cursor#data
  114. * @type {object}
  115. */
  116. /**
  117. * Cursor stream end event
  118. *
  119. * @event Cursor#end
  120. * @type {null}
  121. */
  122. /**
  123. * Cursor stream close event
  124. *
  125. * @event Cursor#close
  126. * @type {null}
  127. */
  128. /**
  129. * Cursor stream readable event
  130. *
  131. * @event Cursor#readable
  132. * @type {null}
  133. */
  134. // Inherit from Readable
  135. inherits(Cursor, Readable);
  136. // Map core cursor _next method so we can apply mapping
  137. CoreCursor.prototype._next = CoreCursor.prototype.next;
  138. for(var name in CoreCursor.prototype) {
  139. Cursor.prototype[name] = CoreCursor.prototype[name];
  140. }
  141. /**
  142. * Set the cursor query
  143. * @method
  144. * @param {object} filter The filter object used for the cursor.
  145. * @return {Cursor}
  146. */
  147. Cursor.prototype.filter = function(filter) {
  148. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  149. this.s.cmd.query = filter;
  150. return this;
  151. }
  152. /**
  153. * Set a node.js specific cursor option
  154. * @method
  155. * @param {string} field The cursor option to set ['numberOfRetries', 'tailableRetryInterval'].
  156. * @param {object} value The field value.
  157. * @throws {MongoError}
  158. * @return {Cursor}
  159. */
  160. Cursor.prototype.setCursorOption = function(field, value) {
  161. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  162. if(fields.indexOf(field) == -1) throw new MongoError(f("option %s not a supported option %s", field, fields));
  163. this.s[field] = value;
  164. if(field == 'numberOfRetries')
  165. this.s.currentNumberOfRetries = value;
  166. return this;
  167. }
  168. /**
  169. * Add a cursor flag to the cursor
  170. * @method
  171. * @param {string} flag The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'].
  172. * @param {boolean} value The flag boolean value.
  173. * @throws {MongoError}
  174. * @return {Cursor}
  175. */
  176. Cursor.prototype.addCursorFlag = function(flag, value) {
  177. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  178. if(flags.indexOf(flag) == -1) throw new MongoError(f("flag %s not a supported flag %s", flag, flags));
  179. if(typeof value != 'boolean') throw new MongoError(f("flag %s must be a boolean value", flag));
  180. this.s.cmd[flag] = value;
  181. return this;
  182. }
  183. /**
  184. * Add a query modifier to the cursor query
  185. * @method
  186. * @param {string} name The query modifier (must start with $, such as $orderby etc)
  187. * @param {boolean} value The flag boolean value.
  188. * @throws {MongoError}
  189. * @return {Cursor}
  190. */
  191. Cursor.prototype.addQueryModifier = function(name, value) {
  192. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  193. if(name[0] != '$') throw new MongoError(f("%s is not a valid query modifier"));
  194. // Strip of the $
  195. var field = name.substr(1);
  196. // Set on the command
  197. this.s.cmd[field] = value;
  198. // Deal with the special case for sort
  199. if(field == 'orderby') this.s.cmd.sort = this.s.cmd[field];
  200. return this;
  201. }
  202. /**
  203. * Add a comment to the cursor query allowing for tracking the comment in the log.
  204. * @method
  205. * @param {string} value The comment attached to this query.
  206. * @throws {MongoError}
  207. * @return {Cursor}
  208. */
  209. Cursor.prototype.comment = function(value) {
  210. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  211. this.s.cmd.comment = value;
  212. return this;
  213. }
  214. /**
  215. * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
  216. * @method
  217. * @param {number} value Number of milliseconds to wait before aborting the query.
  218. * @throws {MongoError}
  219. * @return {Cursor}
  220. */
  221. Cursor.prototype.maxTimeMS = function(value) {
  222. if(typeof value != 'number') throw new MongoError("maxTimeMS must be a number");
  223. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  224. this.s.cmd.maxTimeMS = value;
  225. return this;
  226. }
  227. Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
  228. /**
  229. * Sets a field projection for the query.
  230. * @method
  231. * @param {object} value The field projection object.
  232. * @throws {MongoError}
  233. * @return {Cursor}
  234. */
  235. Cursor.prototype.project = function(value) {
  236. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  237. this.s.cmd.fields = value;
  238. return this;
  239. }
  240. /**
  241. * Sets the sort order of the cursor query.
  242. * @method
  243. * @param {(string|array|object)} keyOrList The key or keys set for the sort.
  244. * @param {number} [direction] The direction of the sorting (1 or -1).
  245. * @throws {MongoError}
  246. * @return {Cursor}
  247. */
  248. Cursor.prototype.sort = function(keyOrList, direction) {
  249. if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support sorting");
  250. if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw new MongoError("Cursor is closed");
  251. var order = keyOrList;
  252. if(direction != null) {
  253. order = [[keyOrList, direction]];
  254. }
  255. this.s.cmd.sort = order;
  256. this.sortValue = order;
  257. return this;
  258. }
  259. /**
  260. * Set the batch size for the cursor.
  261. * @method
  262. * @param {number} value The batchSize for the cursor.
  263. * @throws {MongoError}
  264. * @return {Cursor}
  265. */
  266. Cursor.prototype.batchSize = function(value) {
  267. if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support limit");
  268. if(this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
  269. if(typeof value != 'number') throw new MongoError("batchSize requires an integer");
  270. this.s.cmd.batchSize = value;
  271. this.setCursorBatchSize(value);
  272. return this;
  273. }
  274. /**
  275. * Set the limit for the cursor.
  276. * @method
  277. * @param {number} value The limit for the cursor query.
  278. * @throws {MongoError}
  279. * @return {Cursor}
  280. */
  281. Cursor.prototype.limit = function(value) {
  282. if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support limit");
  283. if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
  284. if(typeof value != 'number') throw new MongoError("limit requires an integer");
  285. this.s.cmd.limit = value;
  286. // this.cursorLimit = value;
  287. this.setCursorLimit(value);
  288. return this;
  289. }
  290. /**
  291. * Set the skip for the cursor.
  292. * @method
  293. * @param {number} value The skip for the cursor query.
  294. * @throws {MongoError}
  295. * @return {Cursor}
  296. */
  297. Cursor.prototype.skip = function(value) {
  298. if(this.s.options.tailable) throw new MongoError("Tailable cursor doesn't support skip");
  299. if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw new MongoError("Cursor is closed");
  300. if(typeof value != 'number') throw new MongoError("skip requires an integer");
  301. this.s.cmd.skip = value;
  302. this.setCursorSkip(value);
  303. return this;
  304. }
  305. /**
  306. * The callback format for results
  307. * @callback Cursor~resultCallback
  308. * @param {MongoError} error An error instance representing the error during the execution.
  309. * @param {(object|null)} result The result object if the command was executed successfully.
  310. */
  311. /**
  312. * Set the new batchSize of the cursor
  313. * @function Cursor.prototype.setBatchSize
  314. * @param {number} value The new batchSize for the cursor
  315. * @return {null}
  316. */
  317. /**
  318. * Get the batchSize of the cursor
  319. * @function Cursor.prototype.batchSize
  320. * @param {number} value The current batchSize for the cursor
  321. * @return {null}
  322. */
  323. /**
  324. * Set the new skip value of the cursor
  325. * @function Cursor.prototype.setCursorSkip
  326. * @param {number} value The new skip for the cursor
  327. * @return {null}
  328. */
  329. /**
  330. * Get the skip value of the cursor
  331. * @function Cursor.prototype.cursorSkip
  332. * @param {number} value The current skip value for the cursor
  333. * @return {null}
  334. */
  335. /**
  336. * Set the new limit value of the cursor
  337. * @function Cursor.prototype.setCursorLimit
  338. * @param {number} value The new limit for the cursor
  339. * @return {null}
  340. */
  341. /**
  342. * Get the limit value of the cursor
  343. * @function Cursor.prototype.cursorLimit
  344. * @param {number} value The current limit value for the cursor
  345. * @return {null}
  346. */
  347. /**
  348. * Clone the cursor
  349. * @function external:CoreCursor#clone
  350. * @return {Cursor}
  351. */
  352. /**
  353. * Resets the cursor
  354. * @function external:CoreCursor#rewind
  355. * @return {null}
  356. */
  357. /**
  358. * Get the next available document from the cursor, returns null if no more documents are available.
  359. * @method
  360. * @param {Cursor~resultCallback} callback The result callback.
  361. * @throws {MongoError}
  362. * @deprecated
  363. * @return {null}
  364. */
  365. Cursor.prototype.nextObject = function(callback) {
  366. var self = this;
  367. if(this.s.state == Cursor.CLOSED || self.isDead()) return handleCallback(callback, new MongoError("Cursor is closed"));
  368. if(this.s.state == Cursor.INIT && this.s.cmd.sort) {
  369. try {
  370. this.s.cmd.sort = formattedOrderClause(this.s.cmd.sort);
  371. } catch(err) {
  372. return handleCallback(callback, err);
  373. }
  374. }
  375. // Get the next object
  376. self._next(function(err, doc) {
  377. if(err && err.tailable && self.s.currentNumberOfRetries == 0) return callback(err);
  378. if(err && err.tailable && self.s.currentNumberOfRetries > 0) {
  379. self.s.currentNumberOfRetries = self.s.currentNumberOfRetries - 1;
  380. return setTimeout(function() {
  381. self.nextObject(callback);
  382. }, self.s.tailableRetryInterval);
  383. }
  384. self.s.state = Cursor.OPEN;
  385. if(err) return handleCallback(callback, err);
  386. handleCallback(callback, null, doc);
  387. });
  388. }
  389. // Trampoline emptying the number of retrieved items
  390. // without incurring a nextTick operation
  391. var loop = function(self, callback) {
  392. // No more items we are done
  393. if(self.bufferedCount() == 0) return;
  394. // Get the next document
  395. self._next(callback);
  396. // Loop
  397. return loop;
  398. }
  399. /**
  400. * Get the next available document from the cursor, returns null if no more documents are available.
  401. * @method
  402. * @param {Cursor~resultCallback} callback The result callback.
  403. * @throws {MongoError}
  404. * @deprecated
  405. * @return {Promise} returns Promise if no callback passed
  406. */
  407. Cursor.prototype.next = Cursor.prototype.nextObject;
  408. /**
  409. * Iterates over all the documents for this cursor. As with **{cursor.toArray}**,
  410. * not all of the elements will be iterated if this cursor had been previouly accessed.
  411. * In that case, **{cursor.rewind}** can be used to reset the cursor. However, unlike
  412. * **{cursor.toArray}**, the cursor will only hold a maximum of batch size elements
  413. * at any given time if batch size is specified. Otherwise, the caller is responsible
  414. * for making sure that the entire result can fit the memory.
  415. * @method
  416. * @deprecated
  417. * @param {Cursor~resultCallback} callback The result callback.
  418. * @throws {MongoError}
  419. * @return {null}
  420. */
  421. Cursor.prototype.each = function(callback) {
  422. // Rewind cursor state
  423. this.rewind();
  424. // Set current cursor to INIT
  425. this.s.state = Cursor.INIT;
  426. // Run the query
  427. _each(this, callback);
  428. };
  429. // Run the each loop
  430. var _each = function(self, callback) {
  431. if(!callback) throw new MongoError('callback is mandatory');
  432. if(self.isNotified()) return;
  433. if(self.s.state == Cursor.CLOSED || self.isDead()) {
  434. return handleCallback(callback, new MongoError("Cursor is closed"), null);
  435. }
  436. if(self.s.state == Cursor.INIT) self.s.state = Cursor.OPEN;
  437. // Define function to avoid global scope escape
  438. var fn = null;
  439. // Trampoline all the entries
  440. if(self.bufferedCount() > 0) {
  441. while(fn = loop(self, callback)) fn(self, callback);
  442. _each(self, callback);
  443. } else {
  444. self._next(function(err, item) {
  445. if(err) return handleCallback(callback, err);
  446. if(item == null) {
  447. self.s.state = Cursor.CLOSED;
  448. return handleCallback(callback, null, null);
  449. }
  450. if(handleCallback(callback, null, item) == false) return;
  451. _each(self, callback);
  452. })
  453. }
  454. }
  455. /**
  456. * The callback format for the forEach iterator method
  457. * @callback Cursor~iteratorCallback
  458. * @param {Object} doc An emitted document for the iterator
  459. */
  460. /**
  461. * The callback error format for the forEach iterator method
  462. * @callback Cursor~endCallback
  463. * @param {MongoError} error An error instance representing the error during the execution.
  464. */
  465. /**
  466. * Iterates over all the documents for this cursor using the iterator, callback pattern.
  467. * @method
  468. * @param {Cursor~iteratorCallback} iterator The iteration callback.
  469. * @param {Cursor~endCallback} callback The end callback.
  470. * @throws {MongoError}
  471. * @return {null}
  472. */
  473. Cursor.prototype.forEach = function(iterator, callback) {
  474. this.each(function(err, doc){
  475. if(err) { callback(err); return false; }
  476. if(doc != null) { iterator(doc); return true; }
  477. if(doc == null && callback) {
  478. var internalCallback = callback;
  479. callback = null;
  480. internalCallback(null);
  481. return false;
  482. }
  483. });
  484. }
  485. /**
  486. * Set the ReadPreference for the cursor.
  487. * @method
  488. * @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
  489. * @throws {MongoError}
  490. * @return {Cursor}
  491. */
  492. Cursor.prototype.setReadPreference = function(r) {
  493. if(this.s.state != Cursor.INIT) throw new MongoError('cannot change cursor readPreference after cursor has been accessed');
  494. if(r instanceof ReadPreference) {
  495. this.s.options.readPreference = new CoreReadPreference(r.mode, r.tags);
  496. } else {
  497. this.s.options.readPreference = new CoreReadPreference(r);
  498. }
  499. return this;
  500. }
  501. /**
  502. * The callback format for results
  503. * @callback Cursor~toArrayResultCallback
  504. * @param {MongoError} error An error instance representing the error during the execution.
  505. * @param {object[]} documents All the documents the satisfy the cursor.
  506. */
  507. /**
  508. * Returns an array of documents. The caller is responsible for making sure that there
  509. * is enough memory to store the results. Note that the array only contain partial
  510. * results when this cursor had been previouly accessed. In that case,
  511. * cursor.rewind() can be used to reset the cursor.
  512. * @method
  513. * @param {Cursor~toArrayResultCallback} callback The result callback.
  514. * @throws {MongoError}
  515. * @return {null}
  516. */
  517. Cursor.prototype.toArray = function(callback) {
  518. var self = this;
  519. if(!callback) throw new MongoError('callback is mandatory');
  520. if(self.s.options.tailable) return handleCallback(callback, new MongoError("Tailable cursor cannot be converted to array"), null);
  521. var items = [];
  522. // Reset cursor
  523. this.rewind();
  524. self.s.state = Cursor.INIT;
  525. // Fetch all the documents
  526. var fetchDocs = function() {
  527. self._next(function(err, doc) {
  528. if(err) return handleCallback(callback, err);
  529. if(doc == null) {
  530. self.s.state = Cursor.CLOSED;
  531. return handleCallback(callback, null, items);
  532. }
  533. // Add doc to items
  534. items.push(doc)
  535. // Get all buffered objects
  536. if(self.bufferedCount() > 0) {
  537. var docs = self.readBufferedDocuments(self.bufferedCount())
  538. // Transform the doc if transform method added
  539. if(self.s.transforms && typeof self.s.transforms.doc == 'function') {
  540. docs = docs.map(self.s.transforms.doc);
  541. }
  542. items = items.concat(docs);
  543. }
  544. // Attempt a fetch
  545. fetchDocs();
  546. })
  547. }
  548. fetchDocs();
  549. }
  550. /**
  551. * The callback format for results
  552. * @callback Cursor~countResultCallback
  553. * @param {MongoError} error An error instance representing the error during the execution.
  554. * @param {number} count The count of documents.
  555. */
  556. /**
  557. * Get the count of documents for this cursor
  558. * @method
  559. * @param {boolean} applySkipLimit Should the count command apply limit and skip settings on the cursor or in the passed in options.
  560. * @param {object} [options=null] Optional settings.
  561. * @param {number} [options.skip=null] The number of documents to skip.
  562. * @param {number} [options.limit=null] The maximum amounts to count before aborting.
  563. * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
  564. * @param {string} [options.hint=null] An index name hint for the query.
  565. * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
  566. * @param {Cursor~countResultCallback} callback The result callback.
  567. * @return {null}
  568. */
  569. Cursor.prototype.count = function(applySkipLimit, opts, callback) {
  570. var self = this;
  571. if(typeof opts == 'function') callback = opts, opts = {};
  572. opts = opts || {};
  573. if(self.s.cmd.query == null) callback(new MongoError("count can only be used with find command"));
  574. if(typeof applySkipLimit == 'function') {
  575. callback = applySkipLimit;
  576. applySkipLimit = true;
  577. }
  578. if(applySkipLimit) {
  579. if(typeof this.cursorSkip() == 'number') opts.skip = this.cursorSkip();
  580. if(typeof this.cursorLimit() == 'number') opts.limit = this.cursorLimit();
  581. }
  582. // Command
  583. var delimiter = self.s.ns.indexOf('.');
  584. var command = {
  585. 'count': self.s.ns.substr(delimiter+1), 'query': self.s.cmd.query
  586. }
  587. if(typeof opts.maxTimeMS == 'number') {
  588. command.maxTimeMS = opts.maxTimeMS;
  589. } else if(typeof self.s.maxTimeMS == 'number') {
  590. command.maxTimeMS = self.s.maxTimeMS;
  591. }
  592. // Get a server
  593. var server = self.s.topology.getServer(opts);
  594. // Get a connection
  595. var connection = self.s.topology.getConnection(opts);
  596. // Get the callbacks
  597. var callbacks = server.getCallbacks();
  598. // Merge in any options
  599. if(opts.skip) command.skip = opts.skip;
  600. if(opts.limit) command.limit = opts.limit;
  601. if(self.s.options.hint) command.hint = self.s.options.hint;
  602. // Build Query object
  603. var query = new Query(self.s.bson, f("%s.$cmd", self.s.ns.substr(0, delimiter)), command, {
  604. numberToSkip: 0, numberToReturn: -1
  605. , checkKeys: false
  606. });
  607. // Set up callback
  608. callbacks.register(query.requestId, function(err, result) {
  609. if(err) return handleCallback(callback, err);
  610. if(result.documents.length == 1
  611. && (result.documents[0].errmsg
  612. || result.documents[0].err
  613. || result.documents[0]['$err'])) return callback(MongoError.create(result.documents[0]));
  614. handleCallback(callback, null, result.documents[0].n);
  615. });
  616. // Write the initial command out
  617. connection.write(query.toBin());
  618. };
  619. /**
  620. * Close the cursor, sending a KillCursor command and emitting close.
  621. * @method
  622. * @param {Cursor~resultCallback} [callback] The result callback.
  623. * @return {null}
  624. */
  625. Cursor.prototype.close = function(callback) {
  626. this.s.state = Cursor.CLOSED;
  627. // Kill the cursor
  628. this.kill();
  629. // Emit the close event for the cursor
  630. this.emit('close');
  631. // Callback if provided
  632. if(callback) return handleCallback(callback, null, this);
  633. }
  634. /**
  635. * Map all documents using the provided function
  636. * @method
  637. * @param {function} [transform] The mapping transformation method.
  638. * @return {null}
  639. */
  640. Cursor.prototype.map = function(transform) {
  641. this.cursorState.transforms = { doc: transform };
  642. return this;
  643. }
  644. /**
  645. * Is the cursor closed
  646. * @method
  647. * @return {boolean}
  648. */
  649. Cursor.prototype.isClosed = function() {
  650. return this.isDead();
  651. }
  652. Cursor.prototype.destroy = function(err) {
  653. this.pause();
  654. this.close();
  655. if(err) this.emit('error', err);
  656. }
  657. /**
  658. * Return a modified Readable stream including a possible transform method.
  659. * @method
  660. * @param {object} [options=null] Optional settings.
  661. * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
  662. * @return {Cursor}
  663. */
  664. Cursor.prototype.stream = function(options) {
  665. this.s.streamOptions = options || {};
  666. return this;
  667. }
  668. /**
  669. * Execute the explain for the cursor
  670. * @method
  671. * @param {Cursor~resultCallback} [callback] The result callback.
  672. * @return {null}
  673. */
  674. Cursor.prototype.explain = function(callback) {
  675. this.s.cmd.explain = true;
  676. this._next(callback);
  677. }
  678. Cursor.prototype._read = function(n) {
  679. var self = this;
  680. if(self.s.state == Cursor.CLOSED || self.isDead()) {
  681. return self.push(null);
  682. }
  683. // Get the next item
  684. self.nextObject(function(err, result) {
  685. if(err) {
  686. if(!self.isDead()) self.close();
  687. if(self.listeners('error') && self.listeners('error').length > 0) {
  688. self.emit('error', err);
  689. }
  690. // Emit end event
  691. self.emit('end');
  692. return self.emit('finish');
  693. }
  694. // If we provided a transformation method
  695. if(typeof self.s.streamOptions.transform == 'function' && result != null) {
  696. return self.push(self.s.streamOptions.transform(result));
  697. }
  698. // If we provided a map function
  699. if(self.cursorState.transforms && typeof self.cursorState.transforms.doc == 'function' && result != null) {
  700. return self.push(self.cursorState.transforms.doc(result));
  701. }
  702. // Return the result
  703. self.push(result);
  704. });
  705. }
  706. Object.defineProperty(Cursor.prototype, 'namespace', {
  707. enumerable: true,
  708. get: function() {
  709. if (!this || !this.s) {
  710. return null;
  711. }
  712. // TODO: refactor this logic into core
  713. var ns = this.s.ns || '';
  714. var firstDot = ns.indexOf('.');
  715. if (firstDot < 0) {
  716. return {
  717. database: this.s.ns,
  718. collection: ''
  719. };
  720. }
  721. return {
  722. database: ns.substr(0, firstDot),
  723. collection: ns.substr(firstDot + 1)
  724. };
  725. }
  726. });
  727. /**
  728. * The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.
  729. * @function external:Readable#read
  730. * @param {number} size Optional argument to specify how much data to read.
  731. * @return {(String | Buffer | null)}
  732. */
  733. /**
  734. * Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects.
  735. * @function external:Readable#setEncoding
  736. * @param {string} encoding The encoding to use.
  737. * @return {null}
  738. */
  739. /**
  740. * This method will cause the readable stream to resume emitting data events.
  741. * @function external:Readable#resume
  742. * @return {null}
  743. */
  744. /**
  745. * This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
  746. * @function external:Readable#pause
  747. * @return {null}
  748. */
  749. /**
  750. * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
  751. * @function external:Readable#pipe
  752. * @param {Writable} destination The destination for writing data
  753. * @param {object} [options] Pipe options
  754. * @return {null}
  755. */
  756. /**
  757. * This method will remove the hooks set up for a previous pipe() call.
  758. * @function external:Readable#unpipe
  759. * @param {Writable} [destination] The destination for writing data
  760. * @return {null}
  761. */
  762. /**
  763. * This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
  764. * @function external:Readable#unshift
  765. * @param {(Buffer|string)} chunk Chunk of data to unshift onto the read queue.
  766. * @return {null}
  767. */
  768. /**
  769. * Versions of Node prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See "Compatibility" below for more information.)
  770. * @function external:Readable#wrap
  771. * @param {Stream} stream An "old style" readable stream.
  772. * @return {null}
  773. */
  774. Cursor.INIT = 0;
  775. Cursor.OPEN = 1;
  776. Cursor.CLOSED = 2;
  777. Cursor.GET_MORE = 3;
  778. module.exports = Cursor;