aggregate.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. /*!
  2. * Module dependencies
  3. */
  4. var Promise = require('./promise')
  5. , util = require('util')
  6. , utils = require('./utils')
  7. , Query = require('./query')
  8. , read = Query.prototype.read
  9. /**
  10. * Aggregate constructor used for building aggregation pipelines.
  11. *
  12. * ####Example:
  13. *
  14. * new Aggregate();
  15. * new Aggregate({ $project: { a: 1, b: 1 } });
  16. * new Aggregate({ $project: { a: 1, b: 1 } }, { $skip: 5 });
  17. * new Aggregate([{ $project: { a: 1, b: 1 } }, { $skip: 5 }]);
  18. *
  19. * Returned when calling Model.aggregate().
  20. *
  21. * ####Example:
  22. *
  23. * Model
  24. * .aggregate({ $match: { age: { $gte: 21 }}})
  25. * .unwind('tags')
  26. * .exec(callback)
  27. *
  28. * ####Note:
  29. *
  30. * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
  31. * - Requires MongoDB >= 2.1
  32. *
  33. * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
  34. * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
  35. * @param {Object|Array} [ops] aggregation operator(s) or operator array
  36. * @api public
  37. */
  38. function Aggregate () {
  39. this._pipeline = [];
  40. this._model = undefined;
  41. this.options = undefined;
  42. if (1 === arguments.length && util.isArray(arguments[0])) {
  43. this.append.apply(this, arguments[0]);
  44. } else {
  45. this.append.apply(this, arguments);
  46. }
  47. }
  48. /**
  49. * Binds this aggregate to a model.
  50. *
  51. * @param {Model} model the model to which the aggregate is to be bound
  52. * @return {Aggregate}
  53. * @api private
  54. */
  55. Aggregate.prototype.bind = function (model) {
  56. this._model = model;
  57. return this;
  58. }
  59. /**
  60. * Appends new operators to this aggregate pipeline
  61. *
  62. * ####Examples:
  63. *
  64. * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
  65. *
  66. * // or pass an array
  67. * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
  68. * aggregate.append(pipeline);
  69. *
  70. * @param {Object} ops operator(s) to append
  71. * @return {Aggregate}
  72. * @api public
  73. */
  74. Aggregate.prototype.append = function () {
  75. var args = utils.args(arguments)
  76. , arg;
  77. if (!args.every(isOperator)) {
  78. throw new Error("Arguments must be aggregate pipeline operators");
  79. }
  80. this._pipeline = this._pipeline.concat(args);
  81. return this;
  82. }
  83. /**
  84. * Appends a new $project operator to this aggregate pipeline.
  85. *
  86. * Mongoose query [selection syntax](#query_Query-select) is also supported.
  87. *
  88. * ####Examples:
  89. *
  90. * // include a, include b, exclude _id
  91. * aggregate.project("a b -_id");
  92. *
  93. * // or you may use object notation, useful when
  94. * // you have keys already prefixed with a "-"
  95. * aggregate.project({a: 1, b: 1, _id: 0});
  96. *
  97. * // reshaping documents
  98. * aggregate.project({
  99. * newField: '$b.nested'
  100. * , plusTen: { $add: ['$val', 10]}
  101. * , sub: {
  102. * name: '$a'
  103. * }
  104. * })
  105. *
  106. * // etc
  107. * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
  108. *
  109. * @param {Object|String} arg field specification
  110. * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
  111. * @return {Aggregate}
  112. * @api public
  113. */
  114. Aggregate.prototype.project = function (arg) {
  115. var fields = {};
  116. if ('object' === typeof arg && !util.isArray(arg)) {
  117. Object.keys(arg).forEach(function (field) {
  118. fields[field] = arg[field];
  119. });
  120. } else if (1 === arguments.length && 'string' === typeof arg) {
  121. arg.split(/\s+/).forEach(function (field) {
  122. if (!field) return;
  123. var include = '-' == field[0] ? 0 : 1;
  124. if (include === 0) field = field.substring(1);
  125. fields[field] = include;
  126. });
  127. } else {
  128. throw new Error("Invalid project() argument. Must be string or object");
  129. }
  130. return this.append({ $project: fields });
  131. }
  132. /**
  133. * Appends a new custom $group operator to this aggregate pipeline.
  134. *
  135. * ####Examples:
  136. *
  137. * aggregate.group({ _id: "$department" });
  138. *
  139. * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
  140. * @method group
  141. * @memberOf Aggregate
  142. * @param {Object} arg $group operator contents
  143. * @return {Aggregate}
  144. * @api public
  145. */
  146. /**
  147. * Appends a new custom $match operator to this aggregate pipeline.
  148. *
  149. * ####Examples:
  150. *
  151. * aggregate.match({ department: { $in: [ "sales", "engineering" } } });
  152. *
  153. * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
  154. * @method match
  155. * @memberOf Aggregate
  156. * @param {Object} arg $match operator contents
  157. * @return {Aggregate}
  158. * @api public
  159. */
  160. /**
  161. * Appends a new $skip operator to this aggregate pipeline.
  162. *
  163. * ####Examples:
  164. *
  165. * aggregate.skip(10);
  166. *
  167. * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
  168. * @method skip
  169. * @memberOf Aggregate
  170. * @param {Number} num number of records to skip before next stage
  171. * @return {Aggregate}
  172. * @api public
  173. */
  174. /**
  175. * Appends a new $limit operator to this aggregate pipeline.
  176. *
  177. * ####Examples:
  178. *
  179. * aggregate.limit(10);
  180. *
  181. * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
  182. * @method limit
  183. * @memberOf Aggregate
  184. * @param {Number} num maximum number of records to pass to the next stage
  185. * @return {Aggregate}
  186. * @api public
  187. */
  188. /**
  189. * Appends a new $geoNear operator to this aggregate pipeline.
  190. *
  191. * ####NOTE:
  192. *
  193. * **MUST** be used as the first operator in the pipeline.
  194. *
  195. * ####Examples:
  196. *
  197. * aggregate.near({
  198. * near: [40.724, -73.997],
  199. * distanceField: "dist.calculated", // required
  200. * maxDistance: 0.008,
  201. * query: { type: "public" },
  202. * includeLocs: "dist.location",
  203. * uniqueDocs: true,
  204. * num: 5
  205. * });
  206. *
  207. * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
  208. * @method near
  209. * @memberOf Aggregate
  210. * @param {Object} parameters
  211. * @return {Aggregate}
  212. * @api public
  213. */
  214. Aggregate.prototype.near = function (arg) {
  215. var op = {};
  216. op.$geoNear = arg;
  217. return this.append(op);
  218. };
  219. /*!
  220. * define methods
  221. */
  222. 'group match skip limit out'.split(' ').forEach(function ($operator) {
  223. Aggregate.prototype[$operator] = function (arg) {
  224. var op = {};
  225. op['$' + $operator] = arg;
  226. return this.append(op);
  227. };
  228. });
  229. /**
  230. * Appends new custom $unwind operator(s) to this aggregate pipeline.
  231. *
  232. * ####Examples:
  233. *
  234. * aggregate.unwind("tags");
  235. * aggregate.unwind("a", "b", "c");
  236. *
  237. * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
  238. * @param {String} fields the field(s) to unwind
  239. * @return {Aggregate}
  240. * @api public
  241. */
  242. Aggregate.prototype.unwind = function () {
  243. var args = utils.args(arguments);
  244. return this.append.apply(this, args.map(function (arg) {
  245. return { $unwind: '$' + arg };
  246. }));
  247. }
  248. /**
  249. * Appends a new $sort operator to this aggregate pipeline.
  250. *
  251. * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
  252. *
  253. * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
  254. *
  255. * ####Examples:
  256. *
  257. * // these are equivalent
  258. * aggregate.sort({ field: 'asc', test: -1 });
  259. * aggregate.sort('field -test');
  260. *
  261. * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
  262. * @param {Object|String} arg
  263. * @return {Aggregate} this
  264. * @api public
  265. */
  266. Aggregate.prototype.sort = function (arg) {
  267. // TODO refactor to reuse the query builder logic
  268. var sort = {};
  269. if ('Object' === arg.constructor.name) {
  270. var desc = ['desc', 'descending', -1];
  271. Object.keys(arg).forEach(function (field) {
  272. sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
  273. });
  274. } else if (1 === arguments.length && 'string' == typeof arg) {
  275. arg.split(/\s+/).forEach(function (field) {
  276. if (!field) return;
  277. var ascend = '-' == field[0] ? -1 : 1;
  278. if (ascend === -1) field = field.substring(1);
  279. sort[field] = ascend;
  280. });
  281. } else {
  282. throw new TypeError('Invalid sort() argument. Must be a string or object.');
  283. }
  284. return this.append({ $sort: sort });
  285. }
  286. /**
  287. * Sets the readPreference option for the aggregation query.
  288. *
  289. * ####Example:
  290. *
  291. * Model.aggregate(..).read('primaryPreferred').exec(callback)
  292. *
  293. * @param {String} pref one of the listed preference options or their aliases
  294. * @param {Array} [tags] optional tags for this query
  295. * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
  296. * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
  297. */
  298. Aggregate.prototype.read = function (pref) {
  299. if (!this.options) this.options = {};
  300. read.apply(this, arguments);
  301. return this;
  302. };
  303. /**
  304. * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
  305. *
  306. * ####Example:
  307. *
  308. * Model.aggregate(..).allowDiskUse(true).exec(callback)
  309. *
  310. * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
  311. * @param {Array} [tags] optional tags for this query
  312. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  313. */
  314. Aggregate.prototype.allowDiskUse = function(value) {
  315. if (!this.options) this.options = {};
  316. this.options.allowDiskUse = value;
  317. return this;
  318. };
  319. /**
  320. * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
  321. * Note the different syntax below: .exec() returns a cursor object, and no callback
  322. * is necessary.
  323. *
  324. * ####Example:
  325. *
  326. * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
  327. * cursor.each(function(error, doc) {
  328. * // use doc
  329. * });
  330. *
  331. * @param {Object} options set the cursor batch size
  332. * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
  333. */
  334. Aggregate.prototype.cursor = function(options) {
  335. if (!this.options) this.options = {};
  336. this.options.cursor = options;
  337. return this;
  338. };
  339. /**
  340. * Executes the aggregate pipeline on the currently bound Model.
  341. *
  342. * ####Example:
  343. *
  344. * aggregate.exec(callback);
  345. *
  346. * // Because a promise is returned, the `callback` is optional.
  347. * var promise = aggregate.exec();
  348. * promise.then(..);
  349. *
  350. * @see Promise #promise_Promise
  351. * @param {Function} [callback]
  352. * @return {Promise}
  353. * @api public
  354. */
  355. Aggregate.prototype.exec = function (callback) {
  356. var promise = new Promise();
  357. if (callback) {
  358. promise.addBack(callback);
  359. }
  360. if (!this._pipeline.length) {
  361. promise.error(new Error("Aggregate has empty pipeline"));
  362. return promise;
  363. }
  364. if (!this._model) {
  365. promise.error(new Error("Aggregate not bound to any Model"));
  366. return promise;
  367. }
  368. prepareDiscriminatorPipeline(this);
  369. if (this.options && this.options.cursor) {
  370. return this._model.collection.aggregate(this._pipeline, this.options || {});
  371. }
  372. this._model
  373. .collection
  374. .aggregate(this._pipeline, this.options || {}, promise.resolve.bind(promise));
  375. return promise;
  376. };
  377. /*!
  378. * Helpers
  379. */
  380. /**
  381. * Checks whether an object is likely a pipeline operator
  382. *
  383. * @param {Object} obj object to check
  384. * @return {Boolean}
  385. * @api private
  386. */
  387. function isOperator (obj) {
  388. var k;
  389. if ('object' !== typeof obj) {
  390. return false;
  391. }
  392. k = Object.keys(obj);
  393. return 1 === k.length && k.some(function (key) {
  394. return '$' === key[0];
  395. });
  396. }
  397. /*!
  398. * Adds the appropriate `$match` pipeline step to the top of an aggregate's
  399. * pipeline, should it's model is a non-root discriminator type. This is
  400. * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
  401. *
  402. * @param {Aggregate} aggregate Aggregate to prepare
  403. */
  404. function prepareDiscriminatorPipeline (aggregate) {
  405. var schema = aggregate._model.schema,
  406. discriminatorMapping = schema && schema.discriminatorMapping;
  407. if (discriminatorMapping && !discriminatorMapping.isRoot) {
  408. var originalPipeline = aggregate._pipeline,
  409. discriminatorKey = discriminatorMapping.key,
  410. discriminatorValue = discriminatorMapping.value;
  411. // If the first pipeline stage is a match and it doesn't specify a `__t`
  412. // key, add the discriminator key to it. This allows for potential
  413. // aggregation query optimizations not to be disturbed by this feature.
  414. if (originalPipeline[0] && originalPipeline[0].$match &&
  415. !originalPipeline[0].$match[discriminatorKey]) {
  416. originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
  417. // `originalPipeline` is a ref, so there's no need for
  418. // aggregate._pipeline = originalPipeline
  419. } else {
  420. var match = {};
  421. match[discriminatorKey] = discriminatorValue;
  422. aggregate._pipeline = [{ $match: match }].concat(originalPipeline);
  423. }
  424. }
  425. }
  426. /*!
  427. * Exports
  428. */
  429. module.exports = Aggregate;