ordered.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. "use strict";
  2. var common = require('./common')
  3. , utils = require('../utils')
  4. , toError = require('../utils').toError
  5. , f = require('util').format
  6. , shallowClone = utils.shallowClone
  7. , WriteError = common.WriteError
  8. , BulkWriteResult = common.BulkWriteResult
  9. , LegacyOp = common.LegacyOp
  10. , ObjectID = require('mongodb-core').BSON.ObjectID
  11. , Batch = common.Batch
  12. , mergeBatchResults = common.mergeBatchResults;
  13. /**
  14. * Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
  15. * @class
  16. * @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
  17. */
  18. var FindOperatorsOrdered = function(self) {
  19. this.s = self.s;
  20. }
  21. /**
  22. * Add a single update document to the bulk operation
  23. *
  24. * @method
  25. * @param {object} doc update operations
  26. * @throws {MongoError}
  27. * @return {OrderedBulkOperation}
  28. */
  29. FindOperatorsOrdered.prototype.update = function(updateDocument) {
  30. // Perform upsert
  31. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  32. // Establish the update command
  33. var document = {
  34. q: this.s.currentOp.selector
  35. , u: updateDocument
  36. , multi: true
  37. , upsert: upsert
  38. }
  39. // Clear out current Op
  40. this.s.currentOp = null;
  41. // Add the update document to the list
  42. return addToOperationsList(this, common.UPDATE, document);
  43. }
  44. /**
  45. * Add a single update one document to the bulk operation
  46. *
  47. * @method
  48. * @param {object} doc update operations
  49. * @throws {MongoError}
  50. * @return {OrderedBulkOperation}
  51. */
  52. FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
  53. // Perform upsert
  54. var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
  55. // Establish the update command
  56. var document = {
  57. q: this.s.currentOp.selector
  58. , u: updateDocument
  59. , multi: false
  60. , upsert: upsert
  61. }
  62. // Clear out current Op
  63. this.s.currentOp = null;
  64. // Add the update document to the list
  65. return addToOperationsList(this, common.UPDATE, document);
  66. }
  67. /**
  68. * Add a replace one operation to the bulk operation
  69. *
  70. * @method
  71. * @param {object} doc the new document to replace the existing one with
  72. * @throws {MongoError}
  73. * @return {OrderedBulkOperation}
  74. */
  75. FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
  76. this.updateOne(updateDocument);
  77. }
  78. /**
  79. * Upsert modifier for update bulk operation
  80. *
  81. * @method
  82. * @throws {MongoError}
  83. * @return {FindOperatorsOrdered}
  84. */
  85. FindOperatorsOrdered.prototype.upsert = function() {
  86. this.s.currentOp.upsert = true;
  87. return this;
  88. }
  89. /**
  90. * Add a remove one operation to the bulk operation
  91. *
  92. * @method
  93. * @throws {MongoError}
  94. * @return {OrderedBulkOperation}
  95. */
  96. FindOperatorsOrdered.prototype.deleteOne = function() {
  97. // Establish the update command
  98. var document = {
  99. q: this.s.currentOp.selector
  100. , limit: 1
  101. }
  102. // Clear out current Op
  103. this.s.currentOp = null;
  104. // Add the remove document to the list
  105. return addToOperationsList(this, common.REMOVE, document);
  106. }
  107. // Backward compatibility
  108. FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
  109. /**
  110. * Add a remove operation to the bulk operation
  111. *
  112. * @method
  113. * @throws {MongoError}
  114. * @return {OrderedBulkOperation}
  115. */
  116. FindOperatorsOrdered.prototype.delete = function() {
  117. // Establish the update command
  118. var document = {
  119. q: this.s.currentOp.selector
  120. , limit: 0
  121. }
  122. // Clear out current Op
  123. this.s.currentOp = null;
  124. // Add the remove document to the list
  125. return addToOperationsList(this, common.REMOVE, document);
  126. }
  127. // Backward compatibility
  128. FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
  129. // Add to internal list of documents
  130. var addToOperationsList = function(_self, docType, document) {
  131. // Get the bsonSize
  132. var bsonSize = _self.s.bson.calculateObjectSize(document, false);
  133. // Throw error if the doc is bigger than the max BSON size
  134. if(bsonSize >= _self.s.maxBatchSizeBytes) throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
  135. // Create a new batch object if we don't have a current one
  136. if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  137. // Check if we need to create a new batch
  138. if(((_self.s.currentBatchSize + 1) >= _self.s.maxWriteBatchSize)
  139. || ((_self.s.currentBatchSizeBytes + _self.s.currentBatchSizeBytes) >= _self.s.maxBatchSizeBytes)
  140. || (_self.s.currentBatch.batchType != docType)) {
  141. // Save the batch to the execution stack
  142. _self.s.batches.push(_self.s.currentBatch);
  143. // Create a new batch
  144. _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
  145. // Reset the current size trackers
  146. _self.s.currentBatchSize = 0;
  147. _self.s.currentBatchSizeBytes = 0;
  148. } else {
  149. // Update current batch size
  150. _self.s.currentBatchSize = _self.s.currentBatchSize + 1;
  151. _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
  152. }
  153. if(docType == common.INSERT) {
  154. _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
  155. }
  156. // We have an array of documents
  157. if(Array.isArray(document)) {
  158. throw toError("operation passed in cannot be an Array");
  159. } else {
  160. _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
  161. _self.s.currentBatch.operations.push(document)
  162. _self.s.currentIndex = _self.s.currentIndex + 1;
  163. }
  164. // Return self
  165. return _self;
  166. }
  167. /**
  168. * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
  169. * @class
  170. * @property {number} length Get the number of operations in the bulk.
  171. * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
  172. */
  173. function OrderedBulkOperation(topology, collection, options) {
  174. options = options == null ? {} : options;
  175. // TODO Bring from driver information in isMaster
  176. var self = this;
  177. var executed = false;
  178. // Current item
  179. var currentOp = null;
  180. // Handle to the bson serializer, used to calculate running sizes
  181. var bson = topology.bson;
  182. // Namespace for the operation
  183. var namespace = collection.collectionName;
  184. // Set max byte size
  185. var maxBatchSizeBytes = topology.isMasterDoc.maxBsonObjectSize;
  186. var maxWriteBatchSize = topology.isMasterDoc.maxWriteBatchSize || 1000;
  187. // Get the capabilities
  188. var capabilities = topology.capabilities();
  189. // Get the write concern
  190. var writeConcern = common.writeConcern(shallowClone(options), collection, options);
  191. // Current batch
  192. var currentBatch = null;
  193. var currentIndex = 0;
  194. var currentBatchSize = 0;
  195. var currentBatchSizeBytes = 0;
  196. var batches = [];
  197. // Final results
  198. var bulkResult = {
  199. ok: 1
  200. , writeErrors: []
  201. , writeConcernErrors: []
  202. , insertedIds: []
  203. , nInserted: 0
  204. , nUpserted: 0
  205. , nMatched: 0
  206. , nModified: 0
  207. , nRemoved: 0
  208. , upserted: []
  209. };
  210. // Internal state
  211. this.s = {
  212. // Final result
  213. bulkResult: bulkResult
  214. // Current batch state
  215. , currentBatch: null
  216. , currentIndex: 0
  217. , currentBatchSize: 0
  218. , currentBatchSizeBytes: 0
  219. , batches: []
  220. // Write concern
  221. , writeConcern: writeConcern
  222. // Capabilities
  223. , capabilities: capabilities
  224. // Max batch size options
  225. , maxBatchSizeBytes: maxBatchSizeBytes
  226. , maxWriteBatchSize: maxWriteBatchSize
  227. // Namespace
  228. , namespace: namespace
  229. // BSON
  230. , bson: bson
  231. // Topology
  232. , topology: topology
  233. // Options
  234. , options: options
  235. // Current operation
  236. , currentOp: currentOp
  237. // Executed
  238. , executed: executed
  239. // Collection
  240. , collection: collection
  241. }
  242. }
  243. OrderedBulkOperation.prototype.raw = function(op) {
  244. var key = Object.keys(op)[0];
  245. // Update operations
  246. if((op.updateOne && op.updateOne.q)
  247. || (op.updateMany && op.updateMany.q)
  248. || (op.replaceOne && op.replaceOne.q)) {
  249. op[key].multi = op.updateOne || op.replaceOne ? false : true;
  250. return addToOperationsList(this, common.UPDATE, op[key]);
  251. }
  252. // Crud spec update format
  253. if(op.updateOne || op.updateMany || op.replaceOne) {
  254. var multi = op.updateOne || op.replaceOne ? false : true;
  255. var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
  256. if(op[key].upsert) operation.upsert = true;
  257. return addToOperationsList(this, common.UPDATE, operation);
  258. }
  259. // Remove operations
  260. if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
  261. op[key].limit = op.removeOne ? 1 : 0;
  262. return addToOperationsList(this, common.REMOVE, op[key]);
  263. }
  264. // Crud spec delete operations, less efficient
  265. if(op.deleteOne || op.deleteMany) {
  266. var limit = op.deleteOne ? 1 : 0;
  267. var operation = {q: op[key].filter, limit: limit}
  268. return addToOperationsList(this, common.REMOVE, operation);
  269. }
  270. // Insert operations
  271. if(op.insertOne && op.insertOne.document == null) {
  272. if(op.insertOne._id == null) op.insertOne._id = new ObjectID();
  273. return addToOperationsList(this, common.INSERT, op.insertOne);
  274. } else if(op.insertOne && op.insertOne.document) {
  275. if(op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
  276. return addToOperationsList(this, common.INSERT, op.insertOne.document);
  277. }
  278. if(op.insertMany) {
  279. for(var i = 0; i < op.insertMany.length; i++) {
  280. if(op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
  281. addToOperationsList(this, common.INSERT, op.insertMany[i]);
  282. }
  283. return;
  284. }
  285. // No valid type of operation
  286. throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
  287. }
  288. /**
  289. * Add a single insert document to the bulk operation
  290. *
  291. * @param {object} doc the document to insert
  292. * @throws {MongoError}
  293. * @return {OrderedBulkOperation}
  294. */
  295. OrderedBulkOperation.prototype.insert = function(document) {
  296. if(document._id == null) document._id = new ObjectID();
  297. return addToOperationsList(this, common.INSERT, document);
  298. }
  299. /**
  300. * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
  301. *
  302. * @method
  303. * @param {object} selector The selector for the bulk operation.
  304. * @throws {MongoError}
  305. * @return {FindOperatorsOrdered}
  306. */
  307. OrderedBulkOperation.prototype.find = function(selector) {
  308. if (!selector) {
  309. throw toError("Bulk find operation must specify a selector");
  310. }
  311. // Save a current selector
  312. this.s.currentOp = {
  313. selector: selector
  314. }
  315. return new FindOperatorsOrdered(this);
  316. }
  317. Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
  318. enumerable: true,
  319. get: function() {
  320. return this.s.currentIndex;
  321. }
  322. });
  323. //
  324. // Execute next write command in a chain
  325. var executeCommands = function(self, callback) {
  326. if(self.s.batches.length == 0) {
  327. return callback(null, new BulkWriteResult(self.s.bulkResult));
  328. }
  329. // Ordered execution of the command
  330. var batch = self.s.batches.shift();
  331. var resultHandler = function(err, result) {
  332. // If we have and error
  333. if(err) err.ok = 0;
  334. // Merge the results together
  335. var mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
  336. if(mergeResult != null) {
  337. return callback(null, new BulkWriteResult(self.s.bulkResult));
  338. }
  339. // If we are ordered and have errors and they are
  340. // not all replication errors terminate the operation
  341. if(self.s.bulkResult.writeErrors.length > 0) {
  342. return callback(self.s.bulkResult.writeErrors[0], new BulkWriteResult(self.s.bulkResult));
  343. }
  344. // Execute the next command in line
  345. executeCommands(self, callback);
  346. }
  347. var finalOptions = {ordered: true}
  348. if(self.s.writeConcern != null) {
  349. finalOptions.writeConcern = self.s.writeConcern;
  350. }
  351. try {
  352. if(batch.batchType == common.INSERT) {
  353. self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  354. } else if(batch.batchType == common.UPDATE) {
  355. self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  356. } else if(batch.batchType == common.REMOVE) {
  357. self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
  358. }
  359. } catch(err) {
  360. // Force top level error
  361. err.ok = 0;
  362. // Merge top level error and return
  363. callback(null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
  364. }
  365. }
  366. /**
  367. * The callback format for results
  368. * @callback OrderedBulkOperation~resultCallback
  369. * @param {MongoError} error An error instance representing the error during the execution.
  370. * @param {BulkWriteResult} result The bulk write result.
  371. */
  372. /**
  373. * Execute the ordered bulk operation
  374. *
  375. * @method
  376. * @param {object} [options=null] Optional settings.
  377. * @param {(number|string)} [options.w=null] The write concern.
  378. * @param {number} [options.wtimeout=null] The write concern timeout.
  379. * @param {boolean} [options.j=false] Specify a journal write concern.
  380. * @param {boolean} [options.fsync=false] Specify a file sync write concern.
  381. * @param {OrderedBulkOperation~resultCallback} callback The result callback
  382. * @throws {MongoError}
  383. * @return {null}
  384. */
  385. OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
  386. if(this.s.executed) throw new toError("batch cannot be re-executed");
  387. if(typeof _writeConcern == 'function') {
  388. callback = _writeConcern;
  389. } else {
  390. this.s.writeConcern = _writeConcern;
  391. }
  392. // If we have current batch
  393. if(this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
  394. // If we have no operations in the bulk raise an error
  395. if(this.s.batches.length == 0) {
  396. throw toError("Invalid Operation, No operations in bulk");
  397. }
  398. // Execute the commands
  399. return executeCommands(this, callback);
  400. }
  401. /**
  402. * Returns an unordered batch object
  403. * @ignore
  404. */
  405. var initializeOrderedBulkOp = function(topology, collection, options) {
  406. return new OrderedBulkOperation(topology, collection, options);
  407. }
  408. module.exports = initializeOrderedBulkOp;