unordered.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. "use strict";
  2. var common = require('./common')
  3. , utils = require('../utils')
  4. , toError = require('../utils').toError
  5. , f = require('util').format
  6. , shallowClone = utils.shallowClone
  7. , WriteError = common.WriteError
  8. , BulkWriteResult = common.BulkWriteResult
  9. , LegacyOp = common.LegacyOp
  10. , ObjectID = require('mongodb-core').BSON.ObjectID
  11. , Batch = common.Batch
  12. , mergeBatchResults = common.mergeBatchResults;
  13. /**
  14. * Create a FindOperatorsUnordered instance (INTERNAL TYPE, do not instantiate directly)
  15. * @class
  16. * @property {number} length Get the number of operations in the bulk.
  17. * @return {FindOperatorsUnordered} a FindOperatorsUnordered instance.
  18. */
  19. var FindOperatorsUnordered = function(self) {
  20. this.s = self.s;
  21. }
  22. /**
  23. * Add a single update document to the bulk operation
  24. *
  25. * @method
  26. * @param {object} doc update operations
  27. * @throws {MongoError}
  28. * @return {UnorderedBulkOperation}
  29. */
  30. FindOperatorsUnordered.prototype.update = function(updateDocument) {
  31. // Perform upsert
  32. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  33. // Establish the update command
  34. var document = {
  35. q: this.s.currentOp.selector
  36. , u: updateDocument
  37. , multi: true
  38. , upsert: upsert
  39. }
  40. // Clear out current Op
  41. this.s.currentOp = null;
  42. // Add the update document to the list
  43. return addToOperationsList(this, common.UPDATE, document);
  44. }
  45. /**
  46. * Add a single update one document to the bulk operation
  47. *
  48. * @method
  49. * @param {object} doc update operations
  50. * @throws {MongoError}
  51. * @return {UnorderedBulkOperation}
  52. */
  53. FindOperatorsUnordered.prototype.updateOne = function(updateDocument) {
  54. // Perform upsert
  55. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  56. // Establish the update command
  57. var document = {
  58. q: this.s.currentOp.selector
  59. , u: updateDocument
  60. , multi: false
  61. , upsert: upsert
  62. }
  63. // Clear out current Op
  64. this.s.currentOp = null;
  65. // Add the update document to the list
  66. return addToOperationsList(this, common.UPDATE, document);
  67. }
  68. /**
  69. * Add a replace one operation to the bulk operation
  70. *
  71. * @method
  72. * @param {object} doc the new document to replace the existing one with
  73. * @throws {MongoError}
  74. * @return {UnorderedBulkOperation}
  75. */
  76. FindOperatorsUnordered.prototype.replaceOne = function(updateDocument) {
  77. this.updateOne(updateDocument);
  78. }
  79. /**
  80. * Upsert modifier for update bulk operation
  81. *
  82. * @method
  83. * @throws {MongoError}
  84. * @return {UnorderedBulkOperation}
  85. */
  86. FindOperatorsUnordered.prototype.upsert = function() {
  87. this.s.currentOp.upsert = true;
  88. return this;
  89. }
  90. /**
  91. * Add a remove one operation to the bulk operation
  92. *
  93. * @method
  94. * @throws {MongoError}
  95. * @return {UnorderedBulkOperation}
  96. */
  97. FindOperatorsUnordered.prototype.removeOne = function() {
  98. // Establish the update command
  99. var document = {
  100. q: this.s.currentOp.selector
  101. , limit: 1
  102. }
  103. // Clear out current Op
  104. this.s.currentOp = null;
  105. // Add the remove document to the list
  106. return addToOperationsList(this, common.REMOVE, document);
  107. }
  108. /**
  109. * Add a remove operation to the bulk operation
  110. *
  111. * @method
  112. * @throws {MongoError}
  113. * @return {UnorderedBulkOperation}
  114. */
  115. FindOperatorsUnordered.prototype.remove = function() {
  116. // Establish the update command
  117. var document = {
  118. q: this.s.currentOp.selector
  119. , limit: 0
  120. }
  121. // Clear out current Op
  122. this.s.currentOp = null;
  123. // Add the remove document to the list
  124. return addToOperationsList(this, common.REMOVE, document);
  125. }
  126. //
  127. // Add to the operations list
  128. //
  129. var addToOperationsList = function(_self, docType, document) {
  130. // Get the bsonSize
  131. var bsonSize = _self.s.bson.calculateObjectSize(document, false);
  132. // Throw error if the doc is bigger than the max BSON size
  133. if(bsonSize >= _self.s.maxBatchSizeBytes) throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
  134. // Holds the current batch
  135. _self.s.currentBatch = null;
  136. // Get the right type of batch
  137. if(docType == common.INSERT) {
  138. _self.s.currentBatch = _self.s.currentInsertBatch;
  139. } else if(docType == common.UPDATE) {
  140. _self.s.currentBatch = _self.s.currentUpdateBatch;
  141. } else if(docType == common.REMOVE) {
  142. _self.s.currentBatch = _self.s.currentRemoveBatch;
  143. }
  144. // Create a new batch object if we don't have a current one
  145. if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  146. // Check if we need to create a new batch
  147. if(((_self.s.currentBatch.size + 1) >= _self.s.maxWriteBatchSize)
  148. || ((_self.s.currentBatch.sizeBytes + bsonSize) >= _self.s.maxBatchSizeBytes)
  149. || (_self.s.currentBatch.batchType != docType)) {
  150. // Save the batch to the execution stack
  151. _self.s.batches.push(_self.s.currentBatch);
  152. // Create a new batch
  153. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  154. }
  155. // We have an array of documents
  156. if(Array.isArray(document)) {
  157. throw toError("operation passed in cannot be an Array");
  158. } else {
  159. _self.s.currentBatch.operations.push(document);
  160. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  161. _self.s.currentIndex = _self.s.currentIndex + 1;
  162. }
  163. // Save back the current Batch to the right type
  164. if(docType == common.INSERT) {
  165. _self.s.currentInsertBatch = _self.s.currentBatch;
  166. _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
  167. } else if(docType == common.UPDATE) {
  168. _self.s.currentUpdateBatch = _self.s.currentBatch;
  169. } else if(docType == common.REMOVE) {
  170. _self.s.currentRemoveBatch = _self.s.currentBatch;
  171. }
  172. // Update current batch size
  173. _self.s.currentBatch.size = _self.s.currentBatch.size + 1;
  174. _self.s.currentBatch.sizeBytes = _self.s.currentBatch.sizeBytes + bsonSize;
  175. // Return self
  176. return _self;
  177. }
  178. /**
  179. * Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  180. * @class
  181. * @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
  182. */
  183. var UnorderedBulkOperation = function(topology, collection, options) {
  184. options = options == null ? {} : options;
  185. // Contains reference to self
  186. var self = this;
  187. // Get the namesspace for the write operations
  188. var namespace = collection.collectionName;
  189. // Used to mark operation as executed
  190. var executed = false;
  191. // Current item
  192. // var currentBatch = null;
  193. var currentOp = null;
  194. var currentIndex = 0;
  195. var batches = [];
  196. // The current Batches for the different operations
  197. var currentInsertBatch = null;
  198. var currentUpdateBatch = null;
  199. var currentRemoveBatch = null;
  200. // Handle to the bson serializer, used to calculate running sizes
  201. var bson = topology.bson;
  202. // Get the capabilities
  203. var capabilities = topology.capabilities();
  204. // Set max byte size
  205. var maxBatchSizeBytes = topology.isMasterDoc.maxBsonObjectSize;
  206. var maxWriteBatchSize = topology.isMasterDoc.maxWriteBatchSize || 1000;
  207. // Get the write concern
  208. var writeConcern = common.writeConcern(shallowClone(options), collection, options);
  209. // Final results
  210. var bulkResult = {
  211. ok: 1
  212. , writeErrors: []
  213. , writeConcernErrors: []
  214. , insertedIds: []
  215. , nInserted: 0
  216. , nUpserted: 0
  217. , nMatched: 0
  218. , nModified: 0
  219. , nRemoved: 0
  220. , upserted: []
  221. };
  222. // Internal state
  223. this.s = {
  224. // Final result
  225. bulkResult: bulkResult
  226. // Current batch state
  227. , currentInsertBatch: null
  228. , currentUpdateBatch: null
  229. , currentRemoveBatch: null
  230. , currentBatch: null
  231. , currentIndex: 0
  232. , batches: []
  233. // Write concern
  234. , writeConcern: writeConcern
  235. // Capabilities
  236. , capabilities: capabilities
  237. // Max batch size options
  238. , maxBatchSizeBytes: maxBatchSizeBytes
  239. , maxWriteBatchSize: maxWriteBatchSize
  240. // Namespace
  241. , namespace: namespace
  242. // BSON
  243. , bson: bson
  244. // Topology
  245. , topology: topology
  246. // Options
  247. , options: options
  248. // Current operation
  249. , currentOp: currentOp
  250. // Executed
  251. , executed: executed
  252. // Collection
  253. , collection: collection
  254. }
  255. }
  256. /**
  257. * Add a single insert document to the bulk operation
  258. *
  259. * @param {object} doc the document to insert
  260. * @throws {MongoError}
  261. * @return {UnorderedBulkOperation}
  262. */
  263. UnorderedBulkOperation.prototype.insert = function(document) {
  264. if(document._id == null) document._id = new ObjectID();
  265. return addToOperationsList(this, common.INSERT, document);
  266. }
  267. /**
  268. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  269. *
  270. * @method
  271. * @param {object} selector The selector for the bulk operation.
  272. * @throws {MongoError}
  273. * @return {FindOperatorsUnordered}
  274. */
  275. UnorderedBulkOperation.prototype.find = function(selector) {
  276. if (!selector) {
  277. throw toError("Bulk find operation must specify a selector");
  278. }
  279. // Save a current selector
  280. this.s.currentOp = {
  281. selector: selector
  282. }
  283. return new FindOperatorsUnordered(this);
  284. }
  285. Object.defineProperty(UnorderedBulkOperation.prototype, 'length', {
  286. enumerable: true,
  287. get: function() {
  288. return this.s.currentIndex;
  289. }
  290. });
  291. UnorderedBulkOperation.prototype.raw = function(op) {
  292. var key = Object.keys(op)[0];
  293. // Update operations
  294. if((op.updateOne && op.updateOne.q)
  295. || (op.updateMany && op.updateMany.q)
  296. || (op.replaceOne && op.replaceOne.q)) {
  297. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  298. return addToOperationsList(this, common.UPDATE, op[key]);
  299. }
  300. // Crud spec update format
  301. if(op.updateOne || op.updateMany || op.replaceOne) {
  302. var multi = op.updateOne || op.replaceOne ? false : true;
  303. var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
  304. if(op[key].upsert) operation.upsert = true;
  305. return addToOperationsList(this, common.UPDATE, operation);
  306. }
  307. // Remove operations
  308. if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
  309. op[key].limit = op.removeOne ? 1 : 0;
  310. return addToOperationsList(this, common.REMOVE, op[key]);
  311. }
  312. // Crud spec delete operations, less efficient
  313. if(op.deleteOne || op.deleteMany) {
  314. var limit = op.deleteOne ? 1 : 0;
  315. var operation = {q: op[key].filter, limit: limit}
  316. return addToOperationsList(this, common.REMOVE, operation);
  317. }
  318. // Insert operations
  319. if(op.insertOne && op.insertOne.document == null) {
  320. if(op.insertOne._id == null) op.insertOne._id = new ObjectID();
  321. return addToOperationsList(this, common.INSERT, op.insertOne);
  322. } else if(op.insertOne && op.insertOne.document) {
  323. if(op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
  324. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  325. }
  326. if(op.insertMany) {
  327. for(var i = 0; i < op.insertMany.length; i++) {
  328. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  329. }
  330. return;
  331. }
  332. // No valid type of operation
  333. throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
  334. }
  335. //
  336. // Execute the command
  337. var executeBatch = function(self, batch, callback) {
  338. var finalOptions = {ordered: false}
  339. if(self.s.writeConcern != null) {
  340. finalOptions.writeConcern = self.s.writeConcern;
  341. }
  342. var resultHandler = function(err, result) {
  343. // If we have and error
  344. if(err) err.ok = 0;
  345. callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
  346. }
  347. try {
  348. if(batch.batchType == common.INSERT) {
  349. self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  350. } else if(batch.batchType == common.UPDATE) {
  351. self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  352. } else if(batch.batchType == common.REMOVE) {
  353. self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  354. }
  355. } catch(err) {
  356. // Force top level error
  357. err.ok = 0;
  358. // Merge top level error and return
  359. callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  360. }
  361. }
  362. //
  363. // Execute all the commands
  364. var executeBatches = function(self, callback) {
  365. var numberOfCommandsToExecute = self.s.batches.length;
  366. // Execute over all the batches
  367. for(var i = 0; i < self.s.batches.length; i++) {
  368. executeBatch(self, self.s.batches[i], function(err, result) {
  369. numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
  370. // Execute
  371. if(numberOfCommandsToExecute == 0) {
  372. var error = self.s.bulkResult.writeErrors.length > 0 ? self.s.bulkResult.writeErrors[0] : null;
  373. callback(error, new BulkWriteResult(self.s.bulkResult));
  374. }
  375. });
  376. }
  377. }
  378. /**
  379. * The callback format for results
  380. * @callback UnorderedBulkOperation~resultCallback
  381. * @param {MongoError} error An error instance representing the error during the execution.
  382. * @param {BulkWriteResult} result The bulk write result.
  383. */
  384. /**
  385. * Execute the ordered bulk operation
  386. *
  387. * @method
  388. * @param {object} [options=null] Optional settings.
  389. * @param {(number|string)} [options.w=null] The write concern.
  390. * @param {number} [options.wtimeout=null] The write concern timeout.
  391. * @param {boolean} [options.j=false] Specify a journal write concern.
  392. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  393. * @param {UnorderedBulkOperation~resultCallback} callback The result callback
  394. * @throws {MongoError}
  395. * @return {null}
  396. */
  397. UnorderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
  398. if(this.s.executed) throw toError("batch cannot be re-executed");
  399. if(typeof _writeConcern == 'function') {
  400. callback = _writeConcern;
  401. } else {
  402. this.s.writeConcern = _writeConcern;
  403. }
  404. // If we have current batch
  405. if(this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
  406. if(this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
  407. if(this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
  408. // If we have no operations in the bulk raise an error
  409. if(this.s.batches.length == 0) {
  410. throw toError("Invalid Operation, No operations in bulk");
  411. }
  412. // Execute batches
  413. return executeBatches(this, function(err, result) {
  414. callback(err, result);
  415. });
  416. }
  417. /**
  418. * Returns an unordered batch object
  419. * @ignore
  420. */
  421. var initializeUnorderedBulkOp = function(topology, collection, options) {
  422. return new UnorderedBulkOperation(topology, collection, options);
  423. }
  424. module.exports = initializeUnorderedBulkOp;