server.js 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253
  1. "use strict";
  2. var inherits = require('util').inherits
  3. , f = require('util').format
  4. , bindToCurrentDomain = require('../connection/utils').bindToCurrentDomain
  5. , EventEmitter = require('events').EventEmitter
  6. , Pool = require('../connection/pool')
  7. , b = require('bson')
  8. , Query = require('../connection/commands').Query
  9. , MongoError = require('../error')
  10. , ReadPreference = require('./read_preference')
  11. , BasicCursor = require('../cursor')
  12. , CommandResult = require('./command_result')
  13. , getSingleProperty = require('../connection/utils').getSingleProperty
  14. , getProperty = require('../connection/utils').getProperty
  15. , debugOptions = require('../connection/utils').debugOptions
  16. , BSON = require('bson').native().BSON
  17. , PreTwoSixWireProtocolSupport = require('../wireprotocol/2_4_support')
  18. , TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support')
  19. , Session = require('./session')
  20. , Logger = require('../connection/logger')
  21. , MongoCR = require('../auth/mongocr')
  22. , X509 = require('../auth/x509')
  23. , Plain = require('../auth/plain')
  24. , GSSAPI = require('../auth/gssapi')
  25. , SSPI = require('../auth/sspi')
  26. , ScramSHA1 = require('../auth/scram');
  27. /**
  28. * @fileOverview The **Server** class is a class that represents a single server topology and is
  29. * used to construct connections.
  30. *
  31. * @example
  32. * var Server = require('mongodb-core').Server
  33. * , ReadPreference = require('mongodb-core').ReadPreference
  34. * , assert = require('assert');
  35. *
  36. * var server = new Server({host: 'localhost', port: 27017});
  37. * // Wait for the connection event
  38. * server.on('connect', function(server) {
  39. * server.destroy();
  40. * });
  41. *
  42. * // Start connecting
  43. * server.connect();
  44. */
  45. // All bson types
  46. var bsonTypes = [b.Long, b.ObjectID, b.Binary, b.Code, b.DBRef, b.Symbol, b.Double, b.Timestamp, b.MaxKey, b.MinKey];
  47. // BSON parser
  48. var bsonInstance = null;
  49. // Server instance id
  50. var serverId = 0;
  51. // Callbacks instance id
  52. var callbackId = 0;
  53. // Single store for all callbacks
  54. var Callbacks = function() {
  55. // EventEmitter.call(this);
  56. var self = this;
  57. // Callbacks
  58. this.callbacks = {};
  59. // Set the callbacks id
  60. this.id = callbackId++;
  61. // Set the type to server
  62. this.type = 'server';
  63. }
  64. //
  65. // Clone the options
  66. var cloneOptions = function(options) {
  67. var opts = {};
  68. for(var name in options) {
  69. opts[name] = options[name];
  70. }
  71. return opts;
  72. }
  73. //
  74. // Flush all callbacks
  75. Callbacks.prototype.flush = function(err) {
  76. for(var id in this.callbacks) {
  77. if(!isNaN(parseInt(id, 10))) {
  78. var callback = this.callbacks[id];
  79. delete this.callbacks[id];
  80. callback(err, null);
  81. }
  82. }
  83. }
  84. Callbacks.prototype.emit = function(id, err, value) {
  85. var callback = this.callbacks[id];
  86. delete this.callbacks[id];
  87. callback(err, value);
  88. }
  89. Callbacks.prototype.raw = function(id) {
  90. if(this.callbacks[id] == null) return false;
  91. return this.callbacks[id].raw == true ? true : false
  92. }
  93. Callbacks.prototype.unregister = function(id) {
  94. delete this.callbacks[id];
  95. }
  96. Callbacks.prototype.register = function(id, callback) {
  97. this.callbacks[id] = bindToCurrentDomain(callback);
  98. }
  99. /**
  100. * @ignore
  101. */
  102. var bindToCurrentDomain = function(callback) {
  103. var domain = process.domain;
  104. if(domain == null || callback == null) return callback;
  105. return domain.bind(callback);
  106. }
  107. var DISCONNECTED = 'disconnected';
  108. var CONNECTING = 'connecting';
  109. var CONNECTED = 'connected';
  110. var DESTROYED = 'destroyed';
  111. // Supports server
  112. var supportsServer = function(_s) {
  113. return _s.ismaster && typeof _s.ismaster.minWireVersion == 'number';
  114. }
  115. //
  116. // createWireProtocolHandler
  117. var createWireProtocolHandler = function(result) {
  118. // 2.6 wire protocol handler
  119. if(result && result.maxWireVersion >= 2) {
  120. return new TwoSixWireProtocolSupport();
  121. }
  122. // 2.4 or earlier wire protocol handler
  123. return new PreTwoSixWireProtocolSupport();
  124. }
  125. //
  126. // Reconnect server
  127. var reconnectServer = function(self, state) {
  128. // If the current reconnect retries is 0 stop attempting to reconnect
  129. if(state.currentReconnectRetry == 0) {
  130. return self.destroy(true, true);
  131. }
  132. // Adjust the number of retries
  133. state.currentReconnectRetry = state.currentReconnectRetry - 1;
  134. // Connect and retrieve the ismaster
  135. isMasterDiscovery(self, self.s.options, function(err, r) {
  136. // Set status to connecting
  137. state.state = CONNECTING;
  138. // Create a new Pool
  139. state.pool = new Pool(state.options);
  140. // error handler
  141. var reconnectErrorHandler = function(err) {
  142. state.state = DISCONNECTED;
  143. // Destroy the pool
  144. state.pool.destroy();
  145. // Adjust the number of retries
  146. state.currentReconnectRetry = state.currentReconnectRetry - 1;
  147. // No more retries
  148. if(state.currentReconnectRetry <= 0) {
  149. self.state = DESTROYED;
  150. self.emit('error', f('failed to connect to %s:%s after %s retries', state.options.host, state.options.port, state.reconnectTries));
  151. } else {
  152. setTimeout(function() {
  153. reconnectServer(self, state);
  154. }, state.reconnectInterval);
  155. }
  156. }
  157. //
  158. // Attempt to connect
  159. state.pool.once('connect', function() {
  160. // Reset retries
  161. state.currentReconnectRetry = state.reconnectTries;
  162. // Remove any non used handlers
  163. var events = ['error', 'close', 'timeout', 'parseError'];
  164. events.forEach(function(e) {
  165. state.pool.removeAllListeners(e);
  166. });
  167. // Set connected state
  168. state.state = CONNECTED;
  169. // Add proper handlers
  170. state.pool.once('error', reconnectErrorHandler);
  171. state.pool.once('close', closeHandler(self, state));
  172. state.pool.once('timeout', timeoutHandler(self, state));
  173. state.pool.once('parseError', fatalErrorHandler(self, state));
  174. // We need to ensure we have re-authenticated
  175. var keys = Object.keys(state.authProviders);
  176. if(keys.length == 0) return self.emit('reconnect', self);
  177. // Execute all providers
  178. var count = keys.length;
  179. // Iterate over keys
  180. for(var i = 0; i < keys.length; i++) {
  181. state.authProviders[keys[i]].reauthenticate(self, state.pool, function(err, r) {
  182. count = count - 1;
  183. // We are done, emit reconnect event
  184. if(count == 0) {
  185. return self.emit('reconnect', self);
  186. }
  187. });
  188. }
  189. });
  190. //
  191. // Handle connection failure
  192. state.pool.once('error', errorHandler(self, state));
  193. state.pool.once('close', errorHandler(self, state));
  194. state.pool.once('timeout', errorHandler(self, state));
  195. state.pool.once('parseError', errorHandler(self, state));
  196. // Connect pool
  197. state.pool.connect();
  198. });
  199. }
  200. //
  201. // Handlers
  202. var messageHandler = function(self, state) {
  203. return function(response, connection) {
  204. try {
  205. // Parse the message
  206. response.parse({raw: state.callbacks.raw(response.responseTo)});
  207. if(state.logger.isDebug()) state.logger.debug(f('message [%s] received from %s', response.raw.toString('hex'), self.name));
  208. state.callbacks.emit(response.responseTo, null, response);
  209. } catch (err) {
  210. state.callbacks.flush(new MongoError(err));
  211. self.destroy();
  212. }
  213. }
  214. }
  215. var errorHandler = function(self, state) {
  216. return function(err, connection) {
  217. if(state.state == DISCONNECTED || state.state == DESTROYED) return;
  218. // Set disconnected state
  219. state.state = DISCONNECTED;
  220. if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'error', [self]);
  221. if(state.logger.isInfo()) state.logger.info(f('server %s errored out with %s', self.name, JSON.stringify(err)));
  222. // Flush out all the callbacks
  223. if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s received an error %s", self.name, JSON.stringify(err))));
  224. // Destroy all connections
  225. self.destroy();
  226. // Emit error event
  227. if(state.emitError && self.listeners('error').length > 0) self.emit('error', err, self);
  228. // If we specified the driver to reconnect perform it
  229. if(state.reconnect) setTimeout(function() {
  230. // state.currentReconnectRetry = state.reconnectTries,
  231. reconnectServer(self, state)
  232. }, state.reconnectInterval);
  233. }
  234. }
  235. var fatalErrorHandler = function(self, state) {
  236. return function(err, connection) {
  237. if(state.state == DISCONNECTED || state.state == DESTROYED) return;
  238. // Set disconnected state
  239. state.state = DISCONNECTED;
  240. if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'error', [self]);
  241. if(state.logger.isInfo()) state.logger.info(f('server %s errored out with %s', self.name, JSON.stringify(err)));
  242. // Flush out all the callbacks
  243. if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s received an error %s", self.name, JSON.stringify(err))));
  244. // Emit error event
  245. self.emit('error', err, self);
  246. // If we specified the driver to reconnect perform it
  247. if(state.reconnect) setTimeout(function() {
  248. // state.currentReconnectRetry = state.reconnectTries,
  249. reconnectServer(self, state)
  250. }, state.reconnectInterval);
  251. // Destroy all connections
  252. self.destroy();
  253. }
  254. }
  255. var timeoutHandler = function(self, state) {
  256. return function(err, connection) {
  257. if(state.state == DISCONNECTED || state.state == DESTROYED) return;
  258. // Set disconnected state
  259. state.state = DISCONNECTED;
  260. if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'timeout', [self]);
  261. if(state.logger.isInfo()) state.logger.info(f('server %s timed out', self.name));
  262. // Flush out all the callbacks
  263. if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s timed out", self.name)));
  264. // Emit error event
  265. self.emit('timeout', err, self);
  266. // If we specified the driver to reconnect perform it
  267. if(state.reconnect) setTimeout(function() {
  268. // state.currentReconnectRetry = state.reconnectTries,
  269. reconnectServer(self, state)
  270. }, state.reconnectInterval);
  271. // Destroy all connections
  272. self.destroy();
  273. }
  274. }
  275. var closeHandler = function(self, state) {
  276. return function(err, connection) {
  277. if(state.state == DISCONNECTED || state.state == DESTROYED) return;
  278. // Set disconnected state
  279. state.state = DISCONNECTED;
  280. if(state.readPreferenceStrategies != null) notifyStrategies(self, self.s, 'close', [self]);
  281. if(state.logger.isInfo()) state.logger.info(f('server %s closed', self.name));
  282. // Flush out all the callbacks
  283. if(state.callbacks) state.callbacks.flush(new MongoError(f("server %s sockets closed", self.name)));
  284. // Emit error event
  285. self.emit('close', err, self);
  286. // If we specified the driver to reconnect perform it
  287. if(state.reconnect) setTimeout(function() {
  288. // state.currentReconnectRetry = state.reconnectTries,
  289. reconnectServer(self, state)
  290. }, state.reconnectInterval);
  291. // Destroy all connections
  292. self.destroy();
  293. }
  294. }
  295. var connectHandler = function(self, state) {
  296. // Apply all stored authentications
  297. var applyAuthentications = function(callback) {
  298. // We need to ensure we have re-authenticated
  299. var keys = Object.keys(state.authProviders);
  300. if(keys.length == 0) return callback(null, null);
  301. // Execute all providers
  302. var count = keys.length;
  303. // Iterate over keys
  304. for(var i = 0; i < keys.length; i++) {
  305. state.authProviders[keys[i]].reauthenticate(self, state.pool, function(err, r) {
  306. count = count - 1;
  307. // We are done, emit reconnect event
  308. if(count == 0) {
  309. return callback(null, null);
  310. }
  311. });
  312. }
  313. }
  314. return function(connection) {
  315. // Apply any applyAuthentications
  316. applyAuthentications(function() {
  317. // Execute an ismaster
  318. self.command('system.$cmd', {ismaster:true}, function(err, r) {
  319. if(err) {
  320. state.state = DISCONNECTED;
  321. return self.emit('close', err, self);
  322. }
  323. // Set the current ismaster
  324. if(!err) {
  325. state.ismaster = r.result;
  326. }
  327. // Determine the wire protocol handler
  328. state.wireProtocolHandler = createWireProtocolHandler(state.ismaster);
  329. // Set the wireProtocolHandler
  330. state.options.wireProtocolHandler = state.wireProtocolHandler;
  331. // Log the ismaster if available
  332. if(state.logger.isInfo()) state.logger.info(f('server %s connected with ismaster [%s]', self.name, JSON.stringify(r.result)));
  333. // Validate if we it's a server we can connect to
  334. if(!supportsServer(state) && state.wireProtocolHandler == null) {
  335. state.state = DISCONNECTED
  336. return self.emit('error', new MongoError("non supported server version"), self);
  337. }
  338. // Set the details
  339. if(state.ismaster && state.ismaster.me) state.serverDetails.name = state.ismaster.me;
  340. // No read preference strategies just emit connect
  341. if(state.readPreferenceStrategies == null) {
  342. state.state = CONNECTED;
  343. return self.emit('connect', self);
  344. }
  345. // Signal connect to all readPreferences
  346. notifyStrategies(self, self.s, 'connect', [self], function(err, result) {
  347. state.state = CONNECTED;
  348. return self.emit('connect', self);
  349. });
  350. });
  351. });
  352. }
  353. }
  354. var slaveOk = function(r) {
  355. if(r) return r.slaveOk()
  356. return false;
  357. }
  358. //
  359. // Execute readPreference Strategies
  360. var notifyStrategies = function(self, state, op, params, callback) {
  361. if(typeof callback != 'function') {
  362. // Notify query start to any read Preference strategies
  363. for(var name in state.readPreferenceStrategies) {
  364. if(state.readPreferenceStrategies[name][op]) {
  365. var strat = state.readPreferenceStrategies[name];
  366. strat[op].apply(strat, params);
  367. }
  368. }
  369. // Finish up
  370. return;
  371. }
  372. // Execute the async callbacks
  373. var nPreferences = Object.keys(state.readPreferenceStrategies).length;
  374. if(nPreferences == 0) return callback(null, null);
  375. for(var name in state.readPreferenceStrategies) {
  376. if(state.readPreferenceStrategies[name][op]) {
  377. var strat = state.readPreferenceStrategies[name];
  378. // Add a callback to params
  379. var cParams = params.slice(0);
  380. cParams.push(function(err, r) {
  381. nPreferences = nPreferences - 1;
  382. if(nPreferences == 0) {
  383. callback(null, null);
  384. }
  385. })
  386. // Execute the readPreference
  387. strat[op].apply(strat, cParams);
  388. }
  389. }
  390. }
  391. var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host'
  392. , 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout'
  393. , 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert', 'key', 'rejectUnauthorized', 'promoteLongs'];
  394. /**
  395. * Creates a new Server instance
  396. * @class
  397. * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
  398. * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
  399. * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
  400. * @param {boolean} [options.emitError=false] Server will emit errors events
  401. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  402. * @param {string} options.host The server host
  403. * @param {number} options.port The server port
  404. * @param {number} [options.size=5] Server connection pool size
  405. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  406. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  407. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  408. * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
  409. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  410. * @param {boolean} [options.ssl=false] Use SSL for connection
  411. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  412. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  413. * @param {Buffer} [options.key] SSL Key file binary buffer
  414. * @param {string} [options.passphrase] SSL Certificate pass phrase
  415. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  416. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  417. * @return {Server} A cursor instance
  418. * @fires Server#connect
  419. * @fires Server#close
  420. * @fires Server#error
  421. * @fires Server#timeout
  422. * @fires Server#parseError
  423. * @fires Server#reconnect
  424. */
  425. var Server = function(options) {
  426. var self = this;
  427. // Add event listener
  428. EventEmitter.call(this);
  429. // BSON Parser, ensure we have a single instance
  430. if(bsonInstance == null) {
  431. bsonInstance = new BSON(bsonTypes);
  432. }
  433. // Reconnect retries
  434. var reconnectTries = options.reconnectTries || 30;
  435. // Keeps all the internal state of the server
  436. this.s = {
  437. // Options
  438. options: options
  439. // Contains all the callbacks
  440. , callbacks: new Callbacks()
  441. // Logger
  442. , logger: Logger('Server', options)
  443. // Server state
  444. , state: DISCONNECTED
  445. // Reconnect option
  446. , reconnect: typeof options.reconnect == 'boolean' ? options.reconnect : true
  447. , reconnectTries: reconnectTries
  448. , reconnectInterval: options.reconnectInterval || 1000
  449. // Swallow or emit errors
  450. , emitError: typeof options.emitError == 'boolean' ? options.emitError : false
  451. // Current state
  452. , currentReconnectRetry: reconnectTries
  453. // Contains the ismaster
  454. , ismaster: null
  455. // Contains any alternate strategies for picking
  456. , readPreferenceStrategies: options.readPreferenceStrategies
  457. // Auth providers
  458. , authProviders: options.authProviders || {}
  459. // Server instance id
  460. , id: serverId++
  461. // Grouping tag used for debugging purposes
  462. , tag: options.tag
  463. // Do we have a not connected handler
  464. , disconnectHandler: options.disconnectHandler
  465. // wireProtocolHandler methods
  466. , wireProtocolHandler: options.wireProtocolHandler || new PreTwoSixWireProtocolSupport()
  467. // Factory overrides
  468. , Cursor: options.cursorFactory || BasicCursor
  469. // BSON Parser, ensure we have a single instance
  470. , bsonInstance: bsonInstance
  471. // Pick the right bson parser
  472. , bson: options.bson ? options.bson : bsonInstance
  473. // Internal connection pool
  474. , pool: null
  475. // Server details
  476. , serverDetails: {
  477. host: options.host
  478. , port: options.port
  479. , name: options.port ? f("%s:%s", options.host, options.port) : options.host
  480. }
  481. }
  482. // Reference state
  483. var s = this.s;
  484. // Add bson parser to options
  485. options.bson = s.bson;
  486. // Set error properties
  487. getProperty(this, 'name', 'name', s.serverDetails, {});
  488. getProperty(this, 'bson', 'bson', s.options, {});
  489. getProperty(this, 'wireProtocolHandler', 'wireProtocolHandler', s.options, {});
  490. getSingleProperty(this, 'id', s.id);
  491. // Add auth providers
  492. this.addAuthProvider('mongocr', new MongoCR());
  493. this.addAuthProvider('x509', new X509());
  494. this.addAuthProvider('plain', new Plain());
  495. this.addAuthProvider('gssapi', new GSSAPI());
  496. this.addAuthProvider('sspi', new SSPI());
  497. this.addAuthProvider('scram-sha-1', new ScramSHA1());
  498. }
  499. inherits(Server, EventEmitter);
  500. /**
  501. * Execute a command
  502. * @method
  503. * @param {string} type Type of BSON parser to use (c++ or js)
  504. */
  505. Server.prototype.setBSONParserType = function(type) {
  506. var nBSON = null;
  507. if(type == 'c++') {
  508. nBSON = require('bson').native().BSON;
  509. } else if(type == 'js') {
  510. nBSON = require('bson').pure().BSON;
  511. } else {
  512. throw new MongoError(f("% parser not supported", type));
  513. }
  514. this.s.options.bson = new nBSON(bsonTypes);
  515. }
  516. /**
  517. * Returns the last known ismaster document for this server
  518. * @method
  519. * @return {object}
  520. */
  521. Server.prototype.lastIsMaster = function() {
  522. return this.s.ismaster;
  523. }
  524. var isMasterDiscovery = function(self, options, callback) {
  525. if(options.noismaster) return callback();
  526. // Clone the options
  527. var options = cloneOptions(options);
  528. // Set the pool size to a single socket
  529. options.size = 1;
  530. options.noismaster = true;
  531. // Create a new server instance
  532. var server = new Server(options)
  533. // Add handlers
  534. server.on('connect', function(_server) {
  535. // Remove all listeners
  536. _server.removeAllListeners('close');
  537. _server.removeAllListeners('error');
  538. _server.removeAllListeners('timeout');
  539. _server.removeAllListeners('parseError');
  540. // Destroy socket
  541. _server.destroy();
  542. // Return lastIsMaster for this server
  543. callback(null, _server.lastIsMaster());
  544. });
  545. // Handle all errors
  546. var errorHandler = function() {
  547. if(callback) {
  548. var _internalCallback = callback;
  549. callback = null;
  550. _internalCallback();
  551. }
  552. }
  553. // Intercept all the errors
  554. server.on('close', errorHandler);
  555. server.on('error', errorHandler);
  556. server.on('timeout', errorHandler);
  557. server.on('parseError', errorHandler);
  558. // Connect
  559. server.connect();
  560. }
  561. /**
  562. * Initiate server connect
  563. * @method
  564. */
  565. Server.prototype.connect = function(_options) {
  566. var self = this;
  567. // Set server specific settings
  568. _options = _options || {}
  569. // Set the promotion
  570. if(typeof _options.promoteLongs == 'boolean') {
  571. self.s.options.promoteLongs = _options.promoteLongs;
  572. }
  573. // Connect and retrieve the ismaster
  574. isMasterDiscovery(self, self.s.options, function(err, r) {
  575. // If we have an ismaster
  576. if(r) {
  577. self.emit('ismaster', r, self);
  578. }
  579. // Destroy existing pool
  580. if(self.s.pool) {
  581. self.s.pool.destroy();
  582. self.s.pool = null;
  583. }
  584. // Set the state to connection
  585. self.s.state = CONNECTING;
  586. // Create a new connection pool
  587. if(!self.s.pool) {
  588. self.s.options.messageHandler = messageHandler(self, self.s);
  589. self.s.pool = new Pool(self.s.options);
  590. }
  591. // Add all the event handlers
  592. self.s.pool.once('timeout', timeoutHandler(self, self.s));
  593. self.s.pool.once('close', closeHandler(self, self.s));
  594. self.s.pool.once('error', errorHandler(self, self.s));
  595. self.s.pool.once('connect', connectHandler(self, self.s));
  596. self.s.pool.once('parseError', fatalErrorHandler(self, self.s));
  597. // Connect the pool
  598. self.s.pool.connect();
  599. });
  600. }
  601. /**
  602. * Destroy the server connection
  603. * @method
  604. */
  605. Server.prototype.destroy = function(emitClose, emitDestroy) {
  606. var self = this;
  607. if(self.s.logger.isDebug()) self.s.logger.debug(f('destroy called on server %s', self.name));
  608. // Emit close
  609. if(emitClose && self.listeners('close').length > 0) self.emit('close', self);
  610. // Emit destroy event
  611. if(emitDestroy) self.emit('destroy', self);
  612. // Set state as destroyed
  613. self.s.state = DESTROYED;
  614. // Close the pool
  615. self.s.pool.destroy();
  616. // Flush out all the callbacks
  617. if(self.s.callbacks) self.s.callbacks.flush(new MongoError(f("server %s sockets closed", self.name)));
  618. }
  619. /**
  620. * Figure out if the server is connected
  621. * @method
  622. * @return {boolean}
  623. */
  624. Server.prototype.isConnected = function() {
  625. var self = this;
  626. if(self.s.pool) return self.s.pool.isConnected();
  627. return false;
  628. }
  629. /**
  630. * Figure out if the server instance was destroyed by calling destroy
  631. * @method
  632. * @return {boolean}
  633. */
  634. Server.prototype.isDestroyed = function() {
  635. return this.s.state == DESTROYED;
  636. }
  637. var executeSingleOperation = function(self, ns, cmd, queryOptions, options, onAll, callback) {
  638. // Create a query instance
  639. var query = new Query(self.s.bson, ns, cmd, queryOptions);
  640. // Set slave OK
  641. query.slaveOk = slaveOk(options.readPreference);
  642. // Notify query start to any read Preference strategies
  643. if(self.s.readPreferenceStrategies != null)
  644. notifyStrategies(self, self.s, 'startOperation', [self, query, new Date()]);
  645. // Get a connection (either passed or from the pool)
  646. var connection = options.connection || self.s.pool.get();
  647. // Double check if we have a valid connection
  648. if(!connection.isConnected()) {
  649. return callback(new MongoError(f("no connection available to server %s", self.name)));
  650. }
  651. // Print cmd and execution connection if in debug mode for logging
  652. if(self.s.logger.isDebug()) {
  653. var json = connection.toJSON();
  654. self.s.logger.debug(f('cmd [%s] about to be executed on connection with id %s at %s:%s', JSON.stringify(cmd), json.id, json.host, json.port));
  655. }
  656. // Execute multiple queries
  657. if(onAll) {
  658. var connections = self.s.pool.getAll();
  659. var total = connections.length;
  660. // We have an error
  661. var error = null;
  662. // Execute on all connections
  663. for(var i = 0; i < connections.length; i++) {
  664. try {
  665. query.incRequestId();
  666. connections[i].write(query.toBin());
  667. } catch(err) {
  668. total = total - 1;
  669. if(total == 0) return callback(MongoError.create(err));
  670. }
  671. // Register the callback
  672. self.s.callbacks.register(query.requestId, function(err, result) {
  673. if(err) error = err;
  674. total = total - 1;
  675. // Done
  676. if(total == 0) {
  677. // Notify end of command
  678. notifyStrategies(self, self.s, 'endOperation', [self, error, result, new Date()]);
  679. if(error) return callback(MongoError.create(error));
  680. // Execute callback, catch and rethrow if needed
  681. try { callback(null, new CommandResult(result.documents[0], connections)); }
  682. catch(err) { process.nextTick(function() { throw err}); }
  683. }
  684. });
  685. }
  686. return;
  687. }
  688. // Execute a single command query
  689. try {
  690. connection.write(query.toBin());
  691. } catch(err) {
  692. return callback(MongoError.create(err));
  693. }
  694. // Register the callback
  695. self.s.callbacks.register(query.requestId, function(err, result) {
  696. // Notify end of command
  697. notifyStrategies(self, self.s, 'endOperation', [self, err, result, new Date()]);
  698. if(err) return callback(err);
  699. if(result.documents[0]['$err']
  700. || result.documents[0]['errmsg']
  701. || result.documents[0]['err']
  702. || result.documents[0]['code']) return callback(MongoError.create(result.documents[0]));
  703. // Execute callback, catch and rethrow if needed
  704. try { callback(null, new CommandResult(result.documents[0], connection)); }
  705. catch(err) { process.nextTick(function() { throw err}); }
  706. });
  707. }
  708. /**
  709. * Execute a command
  710. * @method
  711. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  712. * @param {object} cmd The command hash
  713. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  714. * @param {Connection} [options.connection] Specify connection object to execute command against
  715. * @param {opResultCallback} callback A callback function
  716. */
  717. Server.prototype.command = function(ns, cmd, options, callback) {
  718. if(typeof options == 'function') callback = options, options = {};
  719. var self = this;
  720. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  721. // Ensure we have no options
  722. options = options || {};
  723. // Do we have a read Preference it need to be of type ReadPreference
  724. if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
  725. throw new Error("readPreference must be an instance of ReadPreference");
  726. }
  727. // Debug log
  728. if(self.s.logger.isDebug()) self.s.logger.debug(f('executing command [%s] against %s', JSON.stringify({
  729. ns: ns, cmd: cmd, options: debugOptions(debugFields, options)
  730. }), self.name));
  731. // Topology is not connected, save the call in the provided store to be
  732. // Executed at some point when the handler deems it's reconnected
  733. if(!self.isConnected() && self.s.disconnectHandler != null) {
  734. callback = bindToCurrentDomain(callback);
  735. return self.s.disconnectHandler.add('command', ns, cmd, options, callback);
  736. }
  737. // If we have no connection error
  738. if(!self.s.pool.isConnected()) return callback(new MongoError(f("no connection available to server %s", self.name)));
  739. // Execute on all connections
  740. var onAll = typeof options.onAll == 'boolean' ? options.onAll : false;
  741. // Check keys
  742. var checkKeys = typeof options.checkKeys == 'boolean' ? options.checkKeys: false;
  743. // Serialize function
  744. var serializeFunctions = typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false;
  745. // Query options
  746. var queryOptions = {
  747. numberToSkip: 0, numberToReturn: -1, checkKeys: checkKeys
  748. };
  749. if(serializeFunctions) queryOptions.serializeFunctions = serializeFunctions;
  750. // Single operation execution
  751. if(!Array.isArray(cmd)) {
  752. return executeSingleOperation(self, ns, cmd, queryOptions, options, onAll, callback);
  753. }
  754. // Build commands for each of the instances
  755. var queries = new Array(cmd.length);
  756. for(var i = 0; i < cmd.length; i++) {
  757. queries[i] = new Query(self.s.bson, ns, cmd[i], queryOptions);
  758. queries[i].slaveOk = slaveOk(options.readPreference);
  759. }
  760. // Notify query start to any read Preference strategies
  761. if(self.s.readPreferenceStrategies != null)
  762. notifyStrategies(self, self.s, 'startOperation', [self, queries, new Date()]);
  763. // Get a connection (either passed or from the pool)
  764. var connection = options.connection || self.s.pool.get();
  765. // Double check if we have a valid connection
  766. if(!connection.isConnected()) {
  767. return callback(new MongoError(f("no connection available to server %s", self.name)));
  768. }
  769. // Print cmd and execution connection if in debug mode for logging
  770. if(self.s.logger.isDebug()) {
  771. var json = connection.toJSON();
  772. self.s.logger.debug(f('cmd [%s] about to be executed on connection with id %s at %s:%s', JSON.stringify(queries), json.id, json.host, json.port));
  773. }
  774. // Canceled operations
  775. var canceled = false;
  776. // Number of operations left
  777. var operationsLeft = queries.length;
  778. // Results
  779. var results = [];
  780. // We need to nest the callbacks
  781. for(var i = 0; i < queries.length; i++) {
  782. // Get the query object
  783. var query = queries[i];
  784. // Execute a single command query
  785. try {
  786. connection.write(query.toBin());
  787. } catch(err) {
  788. return callback(MongoError.create(err));
  789. }
  790. // Register the callback
  791. self.s.callbacks.register(query.requestId, function(err, result) {
  792. // If it's canceled ignore the operation
  793. if(canceled) return;
  794. // Update the current index
  795. operationsLeft = operationsLeft - 1;
  796. // If we have an error cancel the operation
  797. if(err) {
  798. canceled = true;
  799. return callback(err);
  800. }
  801. // Return the result
  802. if(result.documents[0]['$err']
  803. || result.documents[0]['errmsg']
  804. || result.documents[0]['err']
  805. || result.documents[0]['code']) {
  806. // Set to canceled
  807. canceled = true;
  808. // Return the error
  809. return callback(MongoError.create(result.documents[0]));
  810. }
  811. // Push results
  812. results.push(result.documents[0]);
  813. // We are done, return the result
  814. if(operationsLeft == 0) {
  815. // Notify end of command
  816. notifyStrategies(self, self.s, 'endOperation', [self, err, result, new Date()]);
  817. // Turn into command results
  818. var commandResults = new Array(results.length);
  819. for(var i = 0; i < results.length; i++) {
  820. commandResults[i] = new CommandResult(results[i], connection);
  821. }
  822. // Execute callback, catch and rethrow if needed
  823. try { callback(null, commandResults); }
  824. catch(err) { process.nextTick(function() { throw err}); }
  825. }
  826. });
  827. }
  828. }
  829. /**
  830. * Insert one or more documents
  831. * @method
  832. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  833. * @param {array} ops An array of documents to insert
  834. * @param {boolean} [options.ordered=true] Execute in order or out of order
  835. * @param {object} [options.writeConcern={}] Write concern for the operation
  836. * @param {opResultCallback} callback A callback function
  837. */
  838. Server.prototype.insert = function(ns, ops, options, callback) {
  839. if(typeof options == 'function') callback = options, options = {};
  840. var self = this;
  841. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  842. // Topology is not connected, save the call in the provided store to be
  843. // Executed at some point when the handler deems it's reconnected
  844. if(!self.isConnected() && self.s.disconnectHandler != null) {
  845. callback = bindToCurrentDomain(callback);
  846. return self.s.disconnectHandler.add('insert', ns, ops, options, callback);
  847. }
  848. // Setup the docs as an array
  849. ops = Array.isArray(ops) ? ops : [ops];
  850. // Execute write
  851. return self.s.wireProtocolHandler.insert(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
  852. }
  853. /**
  854. * Perform one or more update operations
  855. * @method
  856. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  857. * @param {array} ops An array of updates
  858. * @param {boolean} [options.ordered=true] Execute in order or out of order
  859. * @param {object} [options.writeConcern={}] Write concern for the operation
  860. * @param {opResultCallback} callback A callback function
  861. */
  862. Server.prototype.update = function(ns, ops, options, callback) {
  863. if(typeof options == 'function') callback = options, options = {};
  864. var self = this;
  865. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  866. // Topology is not connected, save the call in the provided store to be
  867. // Executed at some point when the handler deems it's reconnected
  868. if(!self.isConnected() && self.s.disconnectHandler != null) {
  869. callback = bindToCurrentDomain(callback);
  870. return self.s.disconnectHandler.add('update', ns, ops, options, callback);
  871. }
  872. // Setup the docs as an array
  873. ops = Array.isArray(ops) ? ops : [ops];
  874. // Execute write
  875. return self.s.wireProtocolHandler.update(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
  876. }
  877. /**
  878. * Perform one or more remove operations
  879. * @method
  880. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  881. * @param {array} ops An array of removes
  882. * @param {boolean} [options.ordered=true] Execute in order or out of order
  883. * @param {object} [options.writeConcern={}] Write concern for the operation
  884. * @param {opResultCallback} callback A callback function
  885. */
  886. Server.prototype.remove = function(ns, ops, options, callback) {
  887. if(typeof options == 'function') callback = options, options = {};
  888. var self = this;
  889. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  890. // Topology is not connected, save the call in the provided store to be
  891. // Executed at some point when the handler deems it's reconnected
  892. if(!self.isConnected() && self.s.disconnectHandler != null) {
  893. callback = bindToCurrentDomain(callback);
  894. return self.s.disconnectHandler.add('remove', ns, ops, options, callback);
  895. }
  896. // Setup the docs as an array
  897. ops = Array.isArray(ops) ? ops : [ops];
  898. // Execute write
  899. return self.s.wireProtocolHandler.remove(self, self.s.ismaster, ns, self.s.bson, self.s.pool, self.s.callbacks, ops, options, callback);
  900. }
  901. /**
  902. * Authenticate using a specified mechanism
  903. * @method
  904. * @param {string} mechanism The Auth mechanism we are invoking
  905. * @param {string} db The db we are invoking the mechanism against
  906. * @param {...object} param Parameters for the specific mechanism
  907. * @param {authResultCallback} callback A callback function
  908. */
  909. Server.prototype.auth = function(mechanism, db) {
  910. var self = this;
  911. var args = Array.prototype.slice.call(arguments, 2);
  912. var callback = args.pop();
  913. // If we don't have the mechanism fail
  914. if(self.s.authProviders[mechanism] == null && mechanism != 'default')
  915. throw new MongoError(f("auth provider %s does not exist", mechanism));
  916. // If we have the default mechanism we pick mechanism based on the wire
  917. // protocol max version. If it's >= 3 then scram-sha1 otherwise mongodb-cr
  918. if(mechanism == 'default' && self.s.ismaster && self.s.ismaster.maxWireVersion >= 3) {
  919. mechanism = 'scram-sha-1';
  920. } else if(mechanism == 'default') {
  921. mechanism = 'mongocr';
  922. }
  923. // Actual arguments
  924. var finalArguments = [self, self.s.pool, db].concat(args.slice(0)).concat([function(err, r) {
  925. if(err) return callback(err);
  926. if(!r) return callback(new MongoError('could not authenticate'));
  927. callback(null, new Session({}, self));
  928. }]);
  929. // Let's invoke the auth mechanism
  930. self.s.authProviders[mechanism].auth.apply(self.s.authProviders[mechanism], finalArguments);
  931. }
  932. //
  933. // Plugin methods
  934. //
  935. /**
  936. * Add custom read preference strategy
  937. * @method
  938. * @param {string} name Name of the read preference strategy
  939. * @param {object} strategy Strategy object instance
  940. */
  941. Server.prototype.addReadPreferenceStrategy = function(name, strategy) {
  942. var self = this;
  943. if(self.s.readPreferenceStrategies == null) self.s.readPreferenceStrategies = {};
  944. self.s.readPreferenceStrategies[name] = strategy;
  945. }
  946. /**
  947. * Add custom authentication mechanism
  948. * @method
  949. * @param {string} name Name of the authentication mechanism
  950. * @param {object} provider Authentication object instance
  951. */
  952. Server.prototype.addAuthProvider = function(name, provider) {
  953. var self = this;
  954. self.s.authProviders[name] = provider;
  955. }
  956. /**
  957. * Compare two server instances
  958. * @method
  959. * @param {Server} server Server to compare equality against
  960. * @return {boolean}
  961. */
  962. Server.prototype.equals = function(server) {
  963. if(typeof server == 'string') return server == this.name;
  964. return server.name == this.name;
  965. }
  966. /**
  967. * All raw connections
  968. * @method
  969. * @return {Connection[]}
  970. */
  971. Server.prototype.connections = function() {
  972. return this.s.pool.getAll();
  973. }
  974. /**
  975. * Get server
  976. * @method
  977. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  978. * @return {Server}
  979. */
  980. Server.prototype.getServer = function(options) {
  981. return this;
  982. }
  983. /**
  984. * Get connection
  985. * @method
  986. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  987. * @return {Connection}
  988. */
  989. Server.prototype.getConnection = function(options) {
  990. return this.s.pool.get();
  991. }
  992. /**
  993. * Get callbacks object
  994. * @method
  995. * @return {Callbacks}
  996. */
  997. Server.prototype.getCallbacks = function() {
  998. return this.s.callbacks;
  999. }
  1000. /**
  1001. * Name of BSON parser currently used
  1002. * @method
  1003. * @return {string}
  1004. */
  1005. Server.prototype.parserType = function() {
  1006. var s = this.s;
  1007. if(s.options.bson.serialize.toString().indexOf('[native code]') != -1)
  1008. return 'c++';
  1009. return 'js';
  1010. }
  1011. // // Command
  1012. // {
  1013. // find: ns
  1014. // , query: <object>
  1015. // , limit: <n>
  1016. // , fields: <object>
  1017. // , skip: <n>
  1018. // , hint: <string>
  1019. // , explain: <boolean>
  1020. // , snapshot: <boolean>
  1021. // , batchSize: <n>
  1022. // , returnKey: <boolean>
  1023. // , maxScan: <n>
  1024. // , min: <n>
  1025. // , max: <n>
  1026. // , showDiskLoc: <boolean>
  1027. // , comment: <string>
  1028. // , maxTimeMS: <n>
  1029. // , raw: <boolean>
  1030. // , readPreference: <ReadPreference>
  1031. // , tailable: <boolean>
  1032. // , oplogReplay: <boolean>
  1033. // , noCursorTimeout: <boolean>
  1034. // , awaitdata: <boolean>
  1035. // , exhaust: <boolean>
  1036. // , partial: <boolean>
  1037. // }
  1038. /**
  1039. * Perform one or more remove operations
  1040. * @method
  1041. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  1042. * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
  1043. * @param {object} [options.batchSize=0] Batchsize for the operation
  1044. * @param {array} [options.documents=[]] Initial documents list for cursor
  1045. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  1046. * @param {opResultCallback} callback A callback function
  1047. */
  1048. Server.prototype.cursor = function(ns, cmd, cursorOptions) {
  1049. var s = this.s;
  1050. cursorOptions = cursorOptions || {};
  1051. var FinalCursor = cursorOptions.cursorFactory || s.Cursor;
  1052. return new FinalCursor(s.bson, ns, cmd, cursorOptions, this, s.options);
  1053. }
  1054. /**
  1055. * A server connect event, used to verify that the connection is up and running
  1056. *
  1057. * @event Server#connect
  1058. * @type {Server}
  1059. */
  1060. /**
  1061. * The server connection closed, all pool connections closed
  1062. *
  1063. * @event Server#close
  1064. * @type {Server}
  1065. */
  1066. /**
  1067. * The server connection caused an error, all pool connections closed
  1068. *
  1069. * @event Server#error
  1070. * @type {Server}
  1071. */
  1072. /**
  1073. * The server connection timed out, all pool connections closed
  1074. *
  1075. * @event Server#timeout
  1076. * @type {Server}
  1077. */
  1078. /**
  1079. * The driver experienced an invalid message, all pool connections closed
  1080. *
  1081. * @event Server#parseError
  1082. * @type {Server}
  1083. */
  1084. /**
  1085. * The server reestablished the connection
  1086. *
  1087. * @event Server#reconnect
  1088. * @type {Server}
  1089. */
  1090. /**
  1091. * This is an insert result callback
  1092. *
  1093. * @callback opResultCallback
  1094. * @param {error} error An error object. Set to null if no error present
  1095. * @param {CommandResult} command result
  1096. */
  1097. /**
  1098. * This is an authentication result callback
  1099. *
  1100. * @callback authResultCallback
  1101. * @param {error} error An error object. Set to null if no error present
  1102. * @param {Session} an authenticated session
  1103. */
  1104. module.exports = Server;