mongos.js 31 KB


  1. "use strict";
  2. var inherits = require('util').inherits
  3. , f = require('util').format
  4. , b = require('bson')
  5. , bindToCurrentDomain = require('../connection/utils').bindToCurrentDomain
  6. , EventEmitter = require('events').EventEmitter
  7. , BasicCursor = require('../cursor')
  8. , BSON = require('bson').native().BSON
  9. , BasicCursor = require('../cursor')
  10. , Server = require('./server')
  11. , Logger = require('../connection/logger')
  12. , ReadPreference = require('./read_preference')
  13. , Session = require('./session')
  14. , MongoError = require('../error');
  15. /**
  16. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  17. * used to construct connections.
  18. *
  19. * @example
  20. * var Mongos = require('mongodb-core').Mongos
  21. * , ReadPreference = require('mongodb-core').ReadPreference
  22. * , assert = require('assert');
  23. *
  24. * var server = new Mongos([{host: 'localhost', port: 30000}]);
  25. * // Wait for the connection event
  26. * server.on('connect', function(server) {
  27. * server.destroy();
  28. * });
  29. *
  30. * // Start connecting
  31. * server.connect();
  32. */
  33. var DISCONNECTED = 'disconnected';
  34. var CONNECTING = 'connecting';
  35. var CONNECTED = 'connected';
  36. var DESTROYED = 'destroyed';
  37. // All bson types
  38. var bsonTypes = [b.Long, b.ObjectID, b.Binary, b.Code, b.DBRef, b.Symbol, b.Double, b.Timestamp, b.MaxKey, b.MinKey];
  39. // BSON parser
  40. var bsonInstance = null;
  41. // Instance id
  42. var mongosId = 0;
  43. //
  44. // Clone the options
  45. var cloneOptions = function(options) {
  46. var opts = {};
  47. for(var name in options) {
  48. opts[name] = options[name];
  49. }
  50. return opts;
  51. }
  52. var State = function(readPreferenceStrategies) {
  53. // Internal state
  54. this.s = {
  55. connectedServers: []
  56. , disconnectedServers: []
  57. , readPreferenceStrategies: readPreferenceStrategies
  58. }
  59. }
  60. //
  61. // A Mongos connected
  62. State.prototype.connected = function(server) {
  63. // Locate in disconnected servers and remove
  64. this.s.disconnectedServers = this.s.disconnectedServers.filter(function(s) {
  65. return !s.equals(server);
  66. });
  67. var found = false;
  68. // Check if the server exists
  69. this.s.connectedServers.forEach(function(s) {
  70. if(s.equals(server)) found = true;
  71. });
  72. // Add to disconnected list if it does not already exist
  73. if(!found) this.s.connectedServers.push(server);
  74. }
  75. //
  76. // A Mongos disconnected
  77. State.prototype.disconnected = function(server) {
  78. // Locate in disconnected servers and remove
  79. this.s.connectedServers = this.s.connectedServers.filter(function(s) {
  80. return !s.equals(server);
  81. });
  82. var found = false;
  83. // Check if the server exists
  84. this.s.disconnectedServers.forEach(function(s) {
  85. if(s.equals(server)) found = true;
  86. });
  87. // Add to disconnected list if it does not already exist
  88. if(!found) this.s.disconnectedServers.push(server);
  89. }
  90. //
  91. // Return the list of disconnected servers
  92. State.prototype.disconnectedServers = function() {
  93. return this.s.disconnectedServers.slice(0);
  94. }
  95. //
  96. // Get connectedServers
  97. State.prototype.connectedServers = function() {
  98. return this.s.connectedServers.slice(0)
  99. }
  100. //
  101. // Get all servers
  102. State.prototype.getAll = function() {
  103. return this.s.connectedServers.slice(0).concat(this.s.disconnectedServers);
  104. }
  105. //
  106. // Get all connections
  107. State.prototype.getAllConnections = function() {
  108. var connections = [];
  109. this.s.connectedServers.forEach(function(e) {
  110. connections = connections.concat(e.connections());
  111. });
  112. return connections;
  113. }
  114. //
  115. // Destroy the state
  116. State.prototype.destroy = function() {
  117. // Destroy any connected servers
  118. while(this.s.connectedServers.length > 0) {
  119. var server = this.s.connectedServers.shift();
  120. // Remove any non used handlers
  121. ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
  122. server.removeAllListeners(e);
  123. })
  124. // Server destroy
  125. server.destroy();
  126. // Add to list of disconnected servers
  127. this.s.disconnectedServers.push(server);
  128. }
  129. }
  130. //
  131. // Are we connected
  132. State.prototype.isConnected = function() {
  133. return this.s.connectedServers.length > 0;
  134. }
  135. //
  136. // Pick a server
  137. State.prototype.pickServer = function(readPreference) {
  138. readPreference = readPreference || ReadPreference.primary;
  139. // Do we have a custom readPreference strategy, use it
  140. if(this.s.readPreferenceStrategies != null && this.s.readPreferenceStrategies[readPreference] != null) {
  141. return this.s.readPreferenceStrategies[readPreference].pickServer(connectedServers, readPreference);
  142. }
  143. // No valid connections
  144. if(this.s.connectedServers.length == 0) throw new MongoError("no mongos proxy available");
  145. // Pick first one
  146. return this.s.connectedServers[0];
  147. }
  148. /**
  149. * Creates a new Mongos instance
  150. * @class
  151. * @param {array} seedlist A list of seeds for the replicaset
  152. * @param {number} [options.reconnectTries=30] Reconnect retries for HA if no servers available
  153. * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
  154. * @param {boolean} [options.emitError=false] Server will emit errors events
  155. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  156. * @param {number} [options.size=5] Server connection pool size
  157. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  158. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  159. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  160. * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
  161. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  162. * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
  163. * @param {boolean} [options.ssl=false] Use SSL for connection
  164. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  165. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  166. * @param {Buffer} [options.key] SSL Key file binary buffer
  167. * @param {string} [options.passphrase] SSL Certificate pass phrase
  168. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  169. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  170. * @return {Mongos} A cursor instance
  171. * @fires Mongos#connect
  172. * @fires Mongos#joined
  173. * @fires Mongos#left
  174. */
  175. var Mongos = function(seedlist, options) {
  176. var self = this;
  177. options = options || {};
  178. // Add event listener
  179. EventEmitter.call(this);
  180. // Validate seedlist
  181. if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
  182. // Validate list
  183. if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
  184. // Validate entries
  185. seedlist.forEach(function(e) {
  186. if(typeof e.host != 'string' || typeof e.port != 'number')
  187. throw new MongoError("seedlist entry must contain a host and port");
  188. });
  189. // BSON Parser, ensure we have a single instance
  190. bsonInstance = bsonInstance == null ? new BSON(bsonTypes) : bsonInstance;
  191. // Pick the right bson parser
  192. var bson = options.bson ? options.bson : bsonInstance;
  193. // Add bson parser to options
  194. options.bson = bson;
  195. // The Mongos state
  196. this.s = {
  197. // Seed list for sharding passed in
  198. seedlist: seedlist
  199. // Passed in options
  200. , options: options
  201. // Logger
  202. , logger: Logger('Mongos', options)
  203. // Reconnect tries
  204. , reconnectTries: options.reconnectTries || 30
  205. // Ha interval
  206. , haInterval: options.haInterval || 5000
  207. // Have omitted fullsetup
  208. , fullsetup: false
  209. // Cursor factory
  210. , Cursor: options.cursorFactory || BasicCursor
  211. // Current credentials used for auth
  212. , credentials: []
  213. // BSON Parser
  214. , bsonInstance: bsonInstance
  215. , bson: bson
  216. // Default state
  217. , state: DISCONNECTED
  218. // Swallow or emit errors
  219. , emitError: typeof options.emitError == 'boolean' ? options.emitError : false
  220. // Contains any alternate strategies for picking
  221. , readPreferenceStrategies: {}
  222. // Auth providers
  223. , authProviders: {}
  224. // Unique instance id
  225. , id: mongosId++
  226. // Authentication in progress
  227. , authInProgress: false
  228. // Servers added while auth in progress
  229. , authInProgressServers: []
  230. // Current retries left
  231. , retriesLeft: options.reconnectTries || 30
  232. // Do we have a not connected handler
  233. , disconnectHandler: options.disconnectHandler
  234. }
  235. // Set up the connection timeout for the options
  236. options.connectionTimeout = options.connectionTimeout || 1000;
  237. // Create a new state for the mongos
  238. this.s.mongosState = new State(this.s.readPreferenceStrategies);
  239. // BSON property (find a server and pass it along)
  240. Object.defineProperty(this, 'bson', {
  241. enumerable: true, get: function() {
  242. var servers = self.s.mongosState.getAll();
  243. return servers.length > 0 ? servers[0].bson : null;
  244. }
  245. });
  246. Object.defineProperty(this, 'id', {
  247. enumerable:true, get: function() { return self.s.id; }
  248. });
  249. Object.defineProperty(this, 'type', {
  250. enumerable:true, get: function() { return 'mongos'; }
  251. });
  252. Object.defineProperty(this, 'haInterval', {
  253. enumerable:true, get: function() { return self.s.haInterval; }
  254. });
  255. Object.defineProperty(this, 'state', {
  256. enumerable:true, get: function() { return self.s.mongosState; }
  257. });
  258. }
  259. inherits(Mongos, EventEmitter);
  260. /**
  261. * Execute a command
  262. * @method
  263. * @param {string} type Type of BSON parser to use (c++ or js)
  264. */
  265. Mongos.prototype.setBSONParserType = function(type) {
  266. var nBSON = null;
  267. if(type == 'c++') {
  268. nBSON = require('bson').native().BSON;
  269. } else if(type == 'js') {
  270. nBSON = require('bson').pure().BSON;
  271. } else {
  272. throw new MongoError(f("% parser not supported", type));
  273. }
  274. this.s.options.bson = new nBSON(bsonTypes);
  275. }
  276. /**
  277. * Returns the last known ismaster document for this server
  278. * @method
  279. * @return {object}
  280. */
  281. Mongos.prototype.lastIsMaster = function() {
  282. var connectedServers = this.s.mongosState.connectedServers();
  283. if(connectedServers.length > 0) return connectedServers[0].lastIsMaster();
  284. return null;
  285. }
  286. /**
  287. * Initiate server connect
  288. * @method
  289. */
  290. Mongos.prototype.connect = function(_options) {
  291. var self = this;
  292. // Start replicaset inquiry process
  293. setTimeout(mongosInquirer(self, self.s), self.s.haInterval);
  294. // Additional options
  295. if(_options) for(var name in _options) self.s.options[name] = _options[name];
  296. // For all entries in the seedlist build a server instance
  297. self.s.seedlist.forEach(function(e) {
  298. // Clone options
  299. var opts = cloneOptions(self.s.options);
  300. // Add host and port
  301. opts.host = e.host;
  302. opts.port = e.port;
  303. opts.reconnect = false;
  304. opts.readPreferenceStrategies = self.s.readPreferenceStrategies;
  305. // Share the auth store
  306. opts.authProviders = self.s.authProviders;
  307. // Don't emit errors
  308. opts.emitError = true;
  309. // Create a new Server
  310. self.s.mongosState.disconnected(new Server(opts));
  311. });
  312. // Get the disconnected servers
  313. var servers = self.s.mongosState.disconnectedServers();
  314. // Attempt to connect to all the servers
  315. while(servers.length > 0) {
  316. // Get the server
  317. var server = servers.shift();
  318. // Remove any non used handlers
  319. ['error', 'close', 'timeout', 'connect', 'message', 'parseError'].forEach(function(e) {
  320. server.removeAllListeners(e);
  321. });
  322. // Set up the event handlers
  323. server.once('error', errorHandlerTemp(self, self.s, server));
  324. server.once('close', errorHandlerTemp(self, self.s, server));
  325. server.once('timeout', errorHandlerTemp(self, self.s, server));
  326. server.once('parseError', errorHandlerTemp(self, self.s, server));
  327. server.once('connect', connectHandler(self, self.s, 'connect'));
  328. if(self.s.logger.isInfo()) self.s.logger.info(f('connecting to server %s', server.name));
  329. // Attempt to connect
  330. server.connect();
  331. }
  332. }
  333. /**
  334. * Destroy the server connection
  335. * @method
  336. */
  337. Mongos.prototype.destroy = function(emitClose) {
  338. this.s.state = DESTROYED;
  339. // Emit close
  340. if(emitClose && self.listeners('close').length > 0) self.emit('close', self);
  341. // Destroy the state
  342. this.s.mongosState.destroy();
  343. }
  344. /**
  345. * Figure out if the server is connected
  346. * @method
  347. * @return {boolean}
  348. */
  349. Mongos.prototype.isConnected = function() {
  350. return this.s.mongosState.isConnected();
  351. }
  352. /**
  353. * Figure out if the server instance was destroyed by calling destroy
  354. * @method
  355. * @return {boolean}
  356. */
  357. Mongos.prototype.isDestroyed = function() {
  358. return this.s.state == DESTROYED;
  359. }
  360. //
  361. // Operations
  362. //
  363. /**
  364. * Insert one or more documents
  365. * @method
  366. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  367. * @param {array} ops An array of documents to insert
  368. * @param {boolean} [options.ordered=true] Execute in order or out of order
  369. * @param {object} [options.writeConcern={}] Write concern for the operation
  370. * @param {opResultCallback} callback A callback function
  371. */
  372. Mongos.prototype.insert = function(ns, ops, options, callback) {
  373. if(typeof options == 'function') callback = options, options = {};
  374. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  375. // Topology is not connected, save the call in the provided store to be
  376. // Executed at some point when the handler deems it's reconnected
  377. if(!this.isConnected() && this.s.disconnectHandler != null) {
  378. callback = bindToCurrentDomain(callback);
  379. return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
  380. }
  381. executeWriteOperation(this.s, 'insert', ns, ops, options, callback);
  382. }
  383. /**
  384. * Perform one or more update operations
  385. * @method
  386. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  387. * @param {array} ops An array of updates
  388. * @param {boolean} [options.ordered=true] Execute in order or out of order
  389. * @param {object} [options.writeConcern={}] Write concern for the operation
  390. * @param {opResultCallback} callback A callback function
  391. */
  392. Mongos.prototype.update = function(ns, ops, options, callback) {
  393. if(typeof options == 'function') callback = options, options = {};
  394. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  395. // Topology is not connected, save the call in the provided store to be
  396. // Executed at some point when the handler deems it's reconnected
  397. if(!this.isConnected() && this.s.disconnectHandler != null) {
  398. callback = bindToCurrentDomain(callback);
  399. return this.s.disconnectHandler.add('update', ns, ops, options, callback);
  400. }
  401. executeWriteOperation(this.s, 'update', ns, ops, options, callback);
  402. }
  403. /**
  404. * Perform one or more remove operations
  405. * @method
  406. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  407. * @param {array} ops An array of removes
  408. * @param {boolean} [options.ordered=true] Execute in order or out of order
  409. * @param {object} [options.writeConcern={}] Write concern for the operation
  410. * @param {opResultCallback} callback A callback function
  411. */
  412. Mongos.prototype.remove = function(ns, ops, options, callback) {
  413. if(typeof options == 'function') callback = options, options = {};
  414. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  415. // Topology is not connected, save the call in the provided store to be
  416. // Executed at some point when the handler deems it's reconnected
  417. if(!this.isConnected() && this.s.disconnectHandler != null) {
  418. callback = bindToCurrentDomain(callback);
  419. return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
  420. }
  421. executeWriteOperation(this.s, 'remove', ns, ops, options, callback);
  422. }
  423. /**
  424. * Execute a command
  425. * @method
  426. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  427. * @param {object} cmd The command hash
  428. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  429. * @param {Connection} [options.connection] Specify connection object to execute command against
  430. * @param {opResultCallback} callback A callback function
  431. */
  432. Mongos.prototype.command = function(ns, cmd, options, callback) {
  433. if(typeof options == 'function') callback = options, options = {};
  434. if(this.s.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  435. var self = this;
  436. // Topology is not connected, save the call in the provided store to be
  437. // Executed at some point when the handler deems it's reconnected
  438. if(!self.isConnected() && self.s.disconnectHandler != null) {
  439. callback = bindToCurrentDomain(callback);
  440. return self.s.disconnectHandler.add('command', ns, cmd, options, callback);
  441. }
  442. var server = null;
  443. // Ensure we have no options
  444. options = options || {};
  445. // We need to execute the command on all servers
  446. if(options.onAll) {
  447. var servers = self.s.mongosState.getAll();
  448. var count = servers.length;
  449. var cmdErr = null;
  450. for(var i = 0; i < servers.length; i++) {
  451. servers[i].command(ns, cmd, options, function(err, r) {
  452. count = count - 1;
  453. // Finished executing command
  454. if(count == 0) {
  455. // Was it a logout command clear any credentials
  456. if(cmd.logout) clearCredentials(state, ns);
  457. // Return the error
  458. callback(err, r);
  459. }
  460. });
  461. }
  462. return;
  463. }
  464. try {
  465. // Get a primary
  466. server = self.s.mongosState.pickServer(options.writeConcern ? ReadPreference.primary : options.readPreference);
  467. } catch(err) {
  468. return callback(err);
  469. }
  470. // No server returned we had an error
  471. if(server == null) return callback(new MongoError("no mongos found"));
  472. server.command(ns, cmd, options, function(err, r) {
  473. // Was it a logout command clear any credentials
  474. if(cmd.logout) clearCredentials(self.s, ns);
  475. callback(err, r);
  476. });
  477. }
  478. /**
  479. * Perform one or more remove operations
  480. * @method
  481. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  482. * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
  483. * @param {object} [options.batchSize=0] Batchsize for the operation
  484. * @param {array} [options.documents=[]] Initial documents list for cursor
  485. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  486. * @param {opResultCallback} callback A callback function
  487. */
  488. Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
  489. cursorOptions = cursorOptions || {};
  490. var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
  491. return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
  492. }
  493. /**
  494. * Authenticate using a specified mechanism
  495. * @method
  496. * @param {string} mechanism The Auth mechanism we are invoking
  497. * @param {string} db The db we are invoking the mechanism against
  498. * @param {...object} param Parameters for the specific mechanism
  499. * @param {authResultCallback} callback A callback function
  500. */
  501. Mongos.prototype.auth = function(mechanism, db) {
  502. var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
  503. var self = this;
  504. var args = Array.prototype.slice.call(arguments, 2);
  505. var callback = args.pop();
  506. // If we don't have the mechanism fail
  507. if(this.s.authProviders[mechanism] == null && mechanism != 'default')
  508. throw new MongoError(f("auth provider %s does not exist", mechanism));
  509. // Authenticate against all the servers
  510. var servers = this.s.mongosState.connectedServers().slice(0);
  511. var count = servers.length;
  512. // Correct authentication
  513. var authenticated = true;
  514. var authErr = null;
  515. // Set auth in progress
  516. this.s.authInProgress = true;
  517. // Authenticate against all servers
  518. while(servers.length > 0) {
  519. var server = servers.shift();
  520. // Arguments without a callback
  521. var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
  522. // Create arguments
  523. var finalArguments = argsWithoutCallback.concat([function(err, r) {
  524. count = count - 1;
  525. if(err) authErr = err;
  526. if(!r) authenticated = false;
  527. // We are done
  528. if(count == 0) {
  529. // We have more servers that are not authenticated, let's authenticate
  530. if(self.s.authInProgressServers.length > 0) {
  531. self.s.authInProgressServers = [];
  532. return self.auth.apply(self, [mechanism, db].concat(args).concat([callback]));
  533. }
  534. // Auth is done
  535. self.s.authInProgress = false;
  536. // Add successful credentials
  537. if(authErr == null) addCredentials(self.s, db, argsWithoutCallback);
  538. // Return the auth error
  539. if(authErr) return callback(authErr, false);
  540. // Successfully authenticated session
  541. callback(null, new Session({}, self));
  542. }
  543. }]);
  544. // Execute the auth
  545. server.auth.apply(server, finalArguments);
  546. }
  547. }
  548. //
  549. // Plugin methods
  550. //
  551. /**
  552. * Add custom read preference strategy
  553. * @method
  554. * @param {string} name Name of the read preference strategy
  555. * @param {object} strategy Strategy object instance
  556. */
  557. Mongos.prototype.addReadPreferenceStrategy = function(name, strategy) {
  558. if(this.s.readPreferenceStrategies == null) this.s.readPreferenceStrategies = {};
  559. this.s.readPreferenceStrategies[name] = strategy;
  560. }
  561. /**
  562. * Add custom authentication mechanism
  563. * @method
  564. * @param {string} name Name of the authentication mechanism
  565. * @param {object} provider Authentication object instance
  566. */
  567. Mongos.prototype.addAuthProvider = function(name, provider) {
  568. this.s.authProviders[name] = provider;
  569. }
  570. /**
  571. * Get connection
  572. * @method
  573. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  574. * @return {Connection}
  575. */
  576. Mongos.prototype.getConnection = function(options) {
  577. // Ensure we have no options
  578. options = options || {};
  579. // Pick the right server based on readPreference
  580. var server = this.s.mongosState.pickServer(options.readPreference);
  581. if(server == null) return null;
  582. // Return connection
  583. return server.getConnection();
  584. }
  585. /**
  586. * Get server
  587. * @method
  588. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  589. * @return {Server}
  590. */
  591. Mongos.prototype.getServer = function(options) {
  592. // Ensure we have no options
  593. options = options || {};
  594. // Pick the right server based on readPreference
  595. return this.s.mongosState.pickServer(options.readPreference);
  596. }
  597. /**
  598. * All raw connections
  599. * @method
  600. * @return {Connection[]}
  601. */
  602. Mongos.prototype.connections = function() {
  603. return this.s.mongosState.getAllConnections();
  604. }
  605. //
  606. // Inquires about state changes
  607. //
  608. var mongosInquirer = function(self, state) {
  609. return function() {
  610. if(state.state == DESTROYED) return
  611. if(state.state == CONNECTED) state.retriesLeft = state.reconnectTries;
  612. // If we have a disconnected site
  613. if(state.state == DISCONNECTED && state.retriesLeft == 0) {
  614. self.destroy();
  615. return self.emit('error', new MongoError(f('failed to reconnect after %s', state.reconnectTries)));
  616. } else if(state == DISCONNECTED) {
  617. state.retriesLeft = state.retriesLeft - 1;
  618. }
  619. // If we have a primary and a disconnect handler, execute
  620. // buffered operations
  621. if(state.mongosState.isConnected() && state.disconnectHandler) {
  622. state.disconnectHandler.execute();
  623. }
  624. // Log the information
  625. if(state.logger.isDebug()) state.logger.debug(f('mongos ha proceess running'));
  626. // Let's query any disconnected proxies
  627. var disconnectedServers = state.mongosState.disconnectedServers();
  628. if(disconnectedServers.length == 0) return setTimeout(mongosInquirer(self, state), state.haInterval);
  629. // Count of connections waiting to be connected
  630. var connectionCount = disconnectedServers.length;
  631. if(state.logger.isDebug()) state.logger.debug(f('mongos ha proceess found %d disconnected proxies', connectionCount));
  632. // Let's attempt to reconnect
  633. while(disconnectedServers.length > 0) {
  634. var server = disconnectedServers.shift();
  635. if(state.logger.isDebug()) state.logger.debug(f('attempting to connect to server %s', server.name));
  636. // Remove any listeners
  637. ['error', 'close', 'timeout', 'connect', 'message', 'parseError'].forEach(function(e) {
  638. server.removeAllListeners(e);
  639. });
  640. // Set up the event handlers
  641. server.once('error', errorHandlerTemp(self, state, server));
  642. server.once('close', errorHandlerTemp(self, state, server));
  643. server.once('timeout', errorHandlerTemp(self, state, server));
  644. server.once('connect', connectHandler(self, state, 'ha'));
  645. // Start connect
  646. server.connect();
  647. }
  648. // Let's keep monitoring but wait for possible timeout to happen
  649. return setTimeout(mongosInquirer(self, state), state.options.connectionTimeout + state.haInterval);
  650. }
  651. }
  652. //
  653. // Error handler for initial connect
  654. var errorHandlerTemp = function(self, state, server) {
  655. return function(err, server) {
  656. // Log the information
  657. if(state.logger.isInfo()) state.logger.info(f('server %s disconnected with error %s', server.name, JSON.stringify(err)));
  658. // Remove any non used handlers
  659. ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
  660. server.removeAllListeners(e);
  661. })
  662. // Signal disconnect of server
  663. state.mongosState.disconnected(server);
  664. }
  665. }
  666. //
  667. // Handlers
  668. var errorHandler = function(self, state) {
  669. return function(err, server) {
  670. if(state.logger.isInfo()) state.logger.info(f('server %s errored out with %s', server.name, JSON.stringify(err)));
  671. state.mongosState.disconnected(server);
  672. // No more servers left emit close
  673. if(state.mongosState.connectedServers().length == 0) {
  674. state.state = DISCONNECTED;
  675. }
  676. // Signal server left
  677. self.emit('left', 'mongos', server);
  678. if(state.emitError) self.emit('error', err, server);
  679. }
  680. }
  681. var timeoutHandler = function(self, state) {
  682. return function(err, server) {
  683. if(state.logger.isInfo()) state.logger.info(f('server %s timed out', server.name));
  684. state.mongosState.disconnected(server);
  685. // No more servers emit close event if no entries left
  686. if(state.mongosState.connectedServers().length == 0) {
  687. state.state = DISCONNECTED;
  688. }
  689. // Signal server left
  690. self.emit('left', 'mongos', server);
  691. }
  692. }
  693. var closeHandler = function(self, state) {
  694. return function(err, server) {
  695. if(state.logger.isInfo()) state.logger.info(f('server %s closed', server.name));
  696. state.mongosState.disconnected(server);
  697. // No more servers left emit close
  698. if(state.mongosState.connectedServers().length == 0) {
  699. state.state = DISCONNECTED;
  700. }
  701. // Signal server left
  702. self.emit('left', 'mongos', server);
  703. }
  704. }
  705. // Connect handler
  706. var connectHandler = function(self, state, e) {
  707. return function(server) {
  708. if(state.logger.isInfo()) state.logger.info(f('connected to %s', server.name));
  709. // Remove any non used handlers
  710. ['error', 'close', 'timeout', 'connect', 'message', 'parseError'].forEach(function(e) {
  711. server.removeAllListeners(e);
  712. });
  713. // finish processing the server
  714. var processNewServer = function(_server) {
  715. // Add the server handling code
  716. if(_server.isConnected()) {
  717. _server.once('error', errorHandler(self, state));
  718. _server.once('close', closeHandler(self, state));
  719. _server.once('timeout', timeoutHandler(self, state));
  720. _server.once('parseError', timeoutHandler(self, state));
  721. }
  722. // Emit joined event
  723. self.emit('joined', 'mongos', _server);
  724. // Add to list connected servers
  725. state.mongosState.connected(_server);
  726. // Do we have a reconnect event
  727. if('ha' == e && state.mongosState.connectedServers().length == 1) {
  728. self.emit('reconnect', _server);
  729. }
  730. // Full setup
  731. if(state.mongosState.disconnectedServers().length == 0 &&
  732. state.mongosState.connectedServers().length > 0 &&
  733. !state.fullsetup) {
  734. state.fullsetup = true;
  735. self.emit('fullsetup');
  736. }
  737. // all connected
  738. if(state.mongosState.disconnectedServers().length == 0 &&
  739. state.mongosState.connectedServers().length == state.seedlist.length &&
  740. !state.all) {
  741. state.all = true;
  742. self.emit('all');
  743. }
  744. // Set connected
  745. if(state.state == DISCONNECTED) {
  746. state.state = CONNECTED;
  747. self.emit('connect', self);
  748. }
  749. }
  750. // Is there an authentication process ongoing
  751. if(state.authInProgress) {
  752. state.authInProgressServers.push(server);
  753. }
  754. // No credentials just process server
  755. if(state.credentials.length == 0) return processNewServer(server);
  756. // Do we have credentials, let's apply them all
  757. var count = state.credentials.length;
  758. // Apply the credentials
  759. for(var i = 0; i < state.credentials.length; i++) {
  760. server.auth.apply(server, state.credentials[i].concat([function(err, r) {
  761. count = count - 1;
  762. if(count == 0) processNewServer(server);
  763. }]));
  764. }
  765. }
  766. }
  767. //
  768. // Add server to the list if it does not exist
  769. var addToListIfNotExist = function(list, server) {
  770. var found = false;
  771. // Remove any non used handlers
  772. ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
  773. server.removeAllListeners(e);
  774. })
  775. // Check if the server already exists
  776. for(var i = 0; i < list.length; i++) {
  777. if(list[i].equals(server)) found = true;
  778. }
  779. if(!found) {
  780. list.push(server);
  781. }
  782. }
  783. // Add the new credential for a db, removing the old
  784. // credential from the cache
  785. var addCredentials = function(state, db, argsWithoutCallback) {
  786. // Remove any credentials for the db
  787. clearCredentials(state, db + ".dummy");
  788. // Add new credentials to list
  789. state.credentials.push(argsWithoutCallback);
  790. }
  791. // Clear out credentials for a namespace
  792. var clearCredentials = function(state, ns) {
  793. var db = ns.split('.')[0];
  794. var filteredCredentials = [];
  795. // Filter out all credentials for the db the user is logging out off
  796. for(var i = 0; i < state.credentials.length; i++) {
  797. if(state.credentials[i][1] != db) filteredCredentials.push(state.credentials[i]);
  798. }
  799. // Set new list of credentials
  800. state.credentials = filteredCredentials;
  801. }
  802. var processReadPreference = function(cmd, options) {
  803. options = options || {}
  804. // No read preference specified
  805. if(options.readPreference == null) return cmd;
  806. }
  807. //
  808. // Execute write operation
  809. var executeWriteOperation = function(state, op, ns, ops, options, callback) {
  810. if(typeof options == 'function') {
  811. callback = options;
  812. options = {};
  813. }
  814. var server = null;
  815. // Ensure we have no options
  816. options = options || {};
  817. try {
  818. // Get a primary
  819. server = state.mongosState.pickServer();
  820. } catch(err) {
  821. return callback(err);
  822. }
  823. // No server returned we had an error
  824. if(server == null) return callback(new MongoError("no mongos found"));
  825. // Execute the command
  826. server[op](ns, ops, options, callback);
  827. }
  828. /**
  829. * A mongos connect event, used to verify that the connection is up and running
  830. *
  831. * @event Mongos#connect
  832. * @type {Mongos}
  833. */
  834. /**
  835. * A server member left the mongos list
  836. *
  837. * @event Mongos#left
  838. * @type {Mongos}
  839. * @param {string} type The type of member that left (mongos)
  840. * @param {Server} server The server object that left
  841. */
  842. /**
  843. * A server member joined the mongos list
  844. *
  845. * @event Mongos#joined
  846. * @type {Mongos}
  847. * @param {string} type The type of member that left (mongos)
  848. * @param {Server} server The server object that joined
  849. */
  850. module.exports = Mongos;