123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- /*!
- * Module dependencies
- */
- var Promise = require('./promise')
- , util = require('util')
- , utils = require('./utils')
- , Query = require('./query')
- , read = Query.prototype.read
- /**
- * Aggregate constructor used for building aggregation pipelines.
- *
- * ####Example:
- *
- * new Aggregate();
- * new Aggregate({ $project: { a: 1, b: 1 } });
- * new Aggregate({ $project: { a: 1, b: 1 } }, { $skip: 5 });
- * new Aggregate([{ $project: { a: 1, b: 1 } }, { $skip: 5 }]);
- *
- * Returned when calling Model.aggregate().
- *
- * ####Example:
- *
- * Model
- * .aggregate({ $match: { age: { $gte: 21 }}})
- * .unwind('tags')
- * .exec(callback)
- *
- * ####Note:
- *
- * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
- * - Requires MongoDB >= 2.1
- *
- * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
- * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
- * @param {Object|Array} [ops] aggregation operator(s) or operator array
- * @api public
- */
- function Aggregate () {
- this._pipeline = [];
- this._model = undefined;
- this.options = undefined;
- if (1 === arguments.length && util.isArray(arguments[0])) {
- this.append.apply(this, arguments[0]);
- } else {
- this.append.apply(this, arguments);
- }
- }
- /**
- * Binds this aggregate to a model.
- *
- * @param {Model} model the model to which the aggregate is to be bound
- * @return {Aggregate}
- * @api private
- */
- Aggregate.prototype.bind = function (model) {
- this._model = model;
- return this;
- }
- /**
- * Appends new operators to this aggregate pipeline
- *
- * ####Examples:
- *
- * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
- *
- * // or pass an array
- * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
- * aggregate.append(pipeline);
- *
- * @param {Object} ops operator(s) to append
- * @return {Aggregate}
- * @api public
- */
- Aggregate.prototype.append = function () {
- var args = utils.args(arguments)
- , arg;
- if (!args.every(isOperator)) {
- throw new Error("Arguments must be aggregate pipeline operators");
- }
- this._pipeline = this._pipeline.concat(args);
- return this;
- }
- /**
- * Appends a new $project operator to this aggregate pipeline.
- *
- * Mongoose query [selection syntax](#query_Query-select) is also supported.
- *
- * ####Examples:
- *
- * // include a, include b, exclude _id
- * aggregate.project("a b -_id");
- *
- * // or you may use object notation, useful when
- * // you have keys already prefixed with a "-"
- * aggregate.project({a: 1, b: 1, _id: 0});
- *
- * // reshaping documents
- * aggregate.project({
- * newField: '$b.nested'
- * , plusTen: { $add: ['$val', 10]}
- * , sub: {
- * name: '$a'
- * }
- * })
- *
- * // etc
- * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
- *
- * @param {Object|String} arg field specification
- * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
- * @return {Aggregate}
- * @api public
- */
- Aggregate.prototype.project = function (arg) {
- var fields = {};
- if ('object' === typeof arg && !util.isArray(arg)) {
- Object.keys(arg).forEach(function (field) {
- fields[field] = arg[field];
- });
- } else if (1 === arguments.length && 'string' === typeof arg) {
- arg.split(/\s+/).forEach(function (field) {
- if (!field) return;
- var include = '-' == field[0] ? 0 : 1;
- if (include === 0) field = field.substring(1);
- fields[field] = include;
- });
- } else {
- throw new Error("Invalid project() argument. Must be string or object");
- }
- return this.append({ $project: fields });
- }
- /**
- * Appends a new custom $group operator to this aggregate pipeline.
- *
- * ####Examples:
- *
- * aggregate.group({ _id: "$department" });
- *
- * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
- * @method group
- * @memberOf Aggregate
- * @param {Object} arg $group operator contents
- * @return {Aggregate}
- * @api public
- */
- /**
- * Appends a new custom $match operator to this aggregate pipeline.
- *
- * ####Examples:
- *
- * aggregate.match({ department: { $in: [ "sales", "engineering" } } });
- *
- * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
- * @method match
- * @memberOf Aggregate
- * @param {Object} arg $match operator contents
- * @return {Aggregate}
- * @api public
- */
- /**
- * Appends a new $skip operator to this aggregate pipeline.
- *
- * ####Examples:
- *
- * aggregate.skip(10);
- *
- * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
- * @method skip
- * @memberOf Aggregate
- * @param {Number} num number of records to skip before next stage
- * @return {Aggregate}
- * @api public
- */
- /**
- * Appends a new $limit operator to this aggregate pipeline.
- *
- * ####Examples:
- *
- * aggregate.limit(10);
- *
- * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
- * @method limit
- * @memberOf Aggregate
- * @param {Number} num maximum number of records to pass to the next stage
- * @return {Aggregate}
- * @api public
- */
- /**
- * Appends a new $geoNear operator to this aggregate pipeline.
- *
- * ####NOTE:
- *
- * **MUST** be used as the first operator in the pipeline.
- *
- * ####Examples:
- *
- * aggregate.near({
- * near: [40.724, -73.997],
- * distanceField: "dist.calculated", // required
- * maxDistance: 0.008,
- * query: { type: "public" },
- * includeLocs: "dist.location",
- * uniqueDocs: true,
- * num: 5
- * });
- *
- * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
- * @method near
- * @memberOf Aggregate
- * @param {Object} parameters
- * @return {Aggregate}
- * @api public
- */
- Aggregate.prototype.near = function (arg) {
- var op = {};
- op.$geoNear = arg;
- return this.append(op);
- };
- /*!
- * define methods
- */
- 'group match skip limit out'.split(' ').forEach(function ($operator) {
- Aggregate.prototype[$operator] = function (arg) {
- var op = {};
- op['$' + $operator] = arg;
- return this.append(op);
- };
- });
- /**
- * Appends new custom $unwind operator(s) to this aggregate pipeline.
- *
- * ####Examples:
- *
- * aggregate.unwind("tags");
- * aggregate.unwind("a", "b", "c");
- *
- * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
- * @param {String} fields the field(s) to unwind
- * @return {Aggregate}
- * @api public
- */
- Aggregate.prototype.unwind = function () {
- var args = utils.args(arguments);
- return this.append.apply(this, args.map(function (arg) {
- return { $unwind: '$' + arg };
- }));
- }
- /**
- * Appends a new $sort operator to this aggregate pipeline.
- *
- * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
- *
- * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
- *
- * ####Examples:
- *
- * // these are equivalent
- * aggregate.sort({ field: 'asc', test: -1 });
- * aggregate.sort('field -test');
- *
- * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
- * @param {Object|String} arg
- * @return {Aggregate} this
- * @api public
- */
- Aggregate.prototype.sort = function (arg) {
- // TODO refactor to reuse the query builder logic
- var sort = {};
- if ('Object' === arg.constructor.name) {
- var desc = ['desc', 'descending', -1];
- Object.keys(arg).forEach(function (field) {
- sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
- });
- } else if (1 === arguments.length && 'string' == typeof arg) {
- arg.split(/\s+/).forEach(function (field) {
- if (!field) return;
- var ascend = '-' == field[0] ? -1 : 1;
- if (ascend === -1) field = field.substring(1);
- sort[field] = ascend;
- });
- } else {
- throw new TypeError('Invalid sort() argument. Must be a string or object.');
- }
- return this.append({ $sort: sort });
- }
- /**
- * Sets the readPreference option for the aggregation query.
- *
- * ####Example:
- *
- * Model.aggregate(..).read('primaryPreferred').exec(callback)
- *
- * @param {String} pref one of the listed preference options or their aliases
- * @param {Array} [tags] optional tags for this query
- * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
- * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
- */
- Aggregate.prototype.read = function (pref) {
- if (!this.options) this.options = {};
- read.apply(this, arguments);
- return this;
- };
- /**
- * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
- *
- * ####Example:
- *
- * Model.aggregate(..).allowDiskUse(true).exec(callback)
- *
- * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
- * @param {Array} [tags] optional tags for this query
- * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
- */
- Aggregate.prototype.allowDiskUse = function(value) {
- if (!this.options) this.options = {};
- this.options.allowDiskUse = value;
- return this;
- };
- /**
- * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
- * Note the different syntax below: .exec() returns a cursor object, and no callback
- * is necessary.
- *
- * ####Example:
- *
- * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
- * cursor.each(function(error, doc) {
- * // use doc
- * });
- *
- * @param {Object} options set the cursor batch size
- * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
- */
- Aggregate.prototype.cursor = function(options) {
- if (!this.options) this.options = {};
- this.options.cursor = options;
- return this;
- };
- /**
- * Executes the aggregate pipeline on the currently bound Model.
- *
- * ####Example:
- *
- * aggregate.exec(callback);
- *
- * // Because a promise is returned, the `callback` is optional.
- * var promise = aggregate.exec();
- * promise.then(..);
- *
- * @see Promise #promise_Promise
- * @param {Function} [callback]
- * @return {Promise}
- * @api public
- */
- Aggregate.prototype.exec = function (callback) {
- var promise = new Promise();
- if (callback) {
- promise.addBack(callback);
- }
- if (!this._pipeline.length) {
- promise.error(new Error("Aggregate has empty pipeline"));
- return promise;
- }
- if (!this._model) {
- promise.error(new Error("Aggregate not bound to any Model"));
- return promise;
- }
- prepareDiscriminatorPipeline(this);
- if (this.options && this.options.cursor) {
- return this._model.collection.aggregate(this._pipeline, this.options || {});
- }
- this._model
- .collection
- .aggregate(this._pipeline, this.options || {}, promise.resolve.bind(promise));
- return promise;
- };
- /*!
- * Helpers
- */
- /**
- * Checks whether an object is likely a pipeline operator
- *
- * @param {Object} obj object to check
- * @return {Boolean}
- * @api private
- */
- function isOperator (obj) {
- var k;
- if ('object' !== typeof obj) {
- return false;
- }
- k = Object.keys(obj);
- return 1 === k.length && k.some(function (key) {
- return '$' === key[0];
- });
- }
- /*!
- * Adds the appropriate `$match` pipeline step to the top of an aggregate's
- * pipeline, should it's model is a non-root discriminator type. This is
- * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
- *
- * @param {Aggregate} aggregate Aggregate to prepare
- */
- function prepareDiscriminatorPipeline (aggregate) {
- var schema = aggregate._model.schema,
- discriminatorMapping = schema && schema.discriminatorMapping;
- if (discriminatorMapping && !discriminatorMapping.isRoot) {
- var originalPipeline = aggregate._pipeline,
- discriminatorKey = discriminatorMapping.key,
- discriminatorValue = discriminatorMapping.value;
- // If the first pipeline stage is a match and it doesn't specify a `__t`
- // key, add the discriminator key to it. This allows for potential
- // aggregation query optimizations not to be disturbed by this feature.
- if (originalPipeline[0] && originalPipeline[0].$match &&
- !originalPipeline[0].$match[discriminatorKey]) {
- originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
- // `originalPipeline` is a ref, so there's no need for
- // aggregate._pipeline = originalPipeline
- } else {
- var match = {};
- match[discriminatorKey] = discriminatorValue;
- aggregate._pipeline = [{ $match: match }].concat(originalPipeline);
- }
- }
- }
- /*!
- * Exports
- */
- module.exports = Aggregate;
|