replset.js 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304
  1. "use strict";
  2. var inherits = require('util').inherits
  3. , f = require('util').format
  4. , b = require('bson')
  5. , bindToCurrentDomain = require('../connection/utils').bindToCurrentDomain
  6. , debugOptions = require('../connection/utils').debugOptions
  7. , EventEmitter = require('events').EventEmitter
  8. , Server = require('./server')
  9. , ReadPreference = require('./read_preference')
  10. , MongoError = require('../error')
  11. , Ping = require('./strategies/ping')
  12. , Session = require('./session')
  13. , BasicCursor = require('../cursor')
  14. , BSON = require('bson').native().BSON
  15. , State = require('./replset_state')
  16. , Logger = require('../connection/logger');
  17. /**
  18. * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
  19. * used to construct connecctions.
  20. *
  21. * @example
  22. * var ReplSet = require('mongodb-core').ReplSet
  23. * , ReadPreference = require('mongodb-core').ReadPreference
  24. * , assert = require('assert');
  25. *
  26. * var server = new ReplSet([{host: 'localhost', port: 30000}], {setName: 'rs'});
  27. * // Wait for the connection event
  28. * server.on('connect', function(server) {
  29. * server.destroy();
  30. * });
  31. *
  32. * // Start connecting
  33. * server.connect();
  34. */
  35. var DISCONNECTED = 'disconnected';
  36. var CONNECTING = 'connecting';
  37. var CONNECTED = 'connected';
  38. var DESTROYED = 'destroyed';
  39. //
  40. // ReplSet instance id
  41. var replSetId = 1;
  42. //
  43. // Clone the options
  44. var cloneOptions = function(options) {
  45. var opts = {};
  46. for(var name in options) {
  47. opts[name] = options[name];
  48. }
  49. return opts;
  50. }
  51. // All bson types
  52. var bsonTypes = [b.Long, b.ObjectID, b.Binary, b.Code, b.DBRef, b.Symbol, b.Double, b.Timestamp, b.MaxKey, b.MinKey];
  53. // BSON parser
  54. var bsonInstance = null;
  55. /**
  56. * Creates a new Replset instance
  57. * @class
  58. * @param {array} seedlist A list of seeds for the replicaset
  59. * @param {boolean} options.setName The Replicaset set name
  60. * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
  61. * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
  62. * @param {boolean} [options.emitError=false] Server will emit errors events
  63. * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
  64. * @param {number} [options.size=5] Server connection pool size
  65. * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
  66. * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
  67. * @param {boolean} [options.noDelay=true] TCP Connection no delay
  68. * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
  69. * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
  70. * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
  71. * @param {boolean} [options.ssl=false] Use SSL for connection
  72. * @param {Buffer} [options.ca] SSL Certificate store binary buffer
  73. * @param {Buffer} [options.cert] SSL Certificate binary buffer
  74. * @param {Buffer} [options.key] SSL Key file binary buffer
  75. * @param {string} [options.passphrase] SSL Certificate pass phrase
  76. * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
  77. * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
  78. * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
  79. * @param {number} [options.acceptableLatency=250] Acceptable latency for selecting a server for reading (in milliseconds)
  80. * @return {ReplSet} A cursor instance
  81. * @fires ReplSet#connect
  82. * @fires ReplSet#ha
  83. * @fires ReplSet#joined
  84. * @fires ReplSet#left
  85. */
  86. var ReplSet = function(seedlist, options) {
  87. var self = this;
  88. options = options || {};
  89. // Validate seedlist
  90. if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
  91. // Validate list
  92. if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
  93. // Validate entries
  94. seedlist.forEach(function(e) {
  95. if(typeof e.host != 'string' || typeof e.port != 'number')
  96. throw new MongoError("seedlist entry must contain a host and port");
  97. });
  98. // Add event listener
  99. EventEmitter.call(this);
  100. // Set the bson instance
  101. bsonInstance = bsonInstance == null ? new BSON(bsonTypes) : bsonInstance;
  102. // Internal state hash for the object
  103. this.s = {
  104. options: options
  105. // Logger instance
  106. , logger: Logger('ReplSet', options)
  107. // Uniquely identify the replicaset instance
  108. , id: replSetId++
  109. // Index
  110. , index: 0
  111. // Ha Index
  112. , haId: 0
  113. // Current credentials used for auth
  114. , credentials: []
  115. // Factory overrides
  116. , Cursor: options.cursorFactory || BasicCursor
  117. // BSON Parser, ensure we have a single instance
  118. , bsonInstance: bsonInstance
  119. // Pick the right bson parser
  120. , bson: options.bson ? options.bson : bsonInstance
  121. // Special replicaset options
  122. , secondaryOnlyConnectionAllowed: typeof options.secondaryOnlyConnectionAllowed == 'boolean'
  123. ? options.secondaryOnlyConnectionAllowed : false
  124. , haInterval: options.haInterval || 10000
  125. // Are we running in debug mode
  126. , debug: typeof options.debug == 'boolean' ? options.debug : false
  127. // The replicaset name
  128. , setName: options.setName
  129. // Swallow or emit errors
  130. , emitError: typeof options.emitError == 'boolean' ? options.emitError : false
  131. // Grouping tag used for debugging purposes
  132. , tag: options.tag
  133. // Do we have a not connected handler
  134. , disconnectHandler: options.disconnectHandler
  135. // Currently connecting servers
  136. , connectingServers: {}
  137. // Contains any alternate strategies for picking
  138. , readPreferenceStrategies: {}
  139. // Auth providers
  140. , authProviders: {}
  141. // All the servers
  142. , disconnectedServers: []
  143. // Initial connection servers
  144. , initialConnectionServers: []
  145. // High availability process running
  146. , highAvailabilityProcessRunning: false
  147. // Full setup
  148. , fullsetup: false
  149. // All servers accounted for (used for testing)
  150. , all: false
  151. // Seedlist
  152. , seedlist: seedlist
  153. // Authentication in progress
  154. , authInProgress: false
  155. // Servers added while auth in progress
  156. , authInProgressServers: []
  157. // Minimum heartbeat frequency used if we detect a server close
  158. , minHeartbeatFrequencyMS: 500
  159. }
  160. // Add bson parser to options
  161. options.bson = this.s.bson;
  162. // Set up the connection timeout for the options
  163. options.connectionTimeout = options.connectionTimeout || 10000;
  164. // Replicaset state
  165. var replState = new State(this, {
  166. id: this.s.id, setName: this.s.setName
  167. , connectingServers: this.s.connectingServers
  168. , secondaryOnlyConnectionAllowed: this.s.secondaryOnlyConnectionAllowed
  169. });
  170. // Add Replicaset state to our internal state
  171. this.s.replState = replState;
  172. // BSON property (find a server and pass it along)
  173. Object.defineProperty(this, 'bson', {
  174. enumerable: true, get: function() {
  175. var servers = self.s.replState.getAll();
  176. return servers.length > 0 ? servers[0].bson : null;
  177. }
  178. });
  179. Object.defineProperty(this, 'id', {
  180. enumerable:true, get: function() { return self.s.id; }
  181. });
  182. Object.defineProperty(this, 'haInterval', {
  183. enumerable:true, get: function() { return self.s.haInterval; }
  184. });
  185. Object.defineProperty(this, 'state', {
  186. enumerable:true, get: function() { return self.s.replState; }
  187. });
  188. //
  189. // Debug options
  190. if(self.s.debug) {
  191. // Add access to the read Preference Strategies
  192. Object.defineProperty(this, 'readPreferenceStrategies', {
  193. enumerable: true, get: function() { return self.s.readPreferenceStrategies; }
  194. });
  195. }
  196. Object.defineProperty(this, 'type', {
  197. enumerable:true, get: function() { return 'replset'; }
  198. });
  199. // Add the ping strategy for nearest
  200. this.addReadPreferenceStrategy('nearest', new Ping(options));
  201. }
  202. inherits(ReplSet, EventEmitter);
  203. //
  204. // Plugin methods
  205. //
  206. /**
  207. * Add custom read preference strategy
  208. * @method
  209. * @param {string} name Name of the read preference strategy
  210. * @param {object} strategy Strategy object instance
  211. */
  212. ReplSet.prototype.addReadPreferenceStrategy = function(name, func) {
  213. this.s.readPreferenceStrategies[name] = func;
  214. }
  215. /**
  216. * Add custom authentication mechanism
  217. * @method
  218. * @param {string} name Name of the authentication mechanism
  219. * @param {object} provider Authentication object instance
  220. */
  221. ReplSet.prototype.addAuthProvider = function(name, provider) {
  222. if(this.s.authProviders == null) this.s.authProviders = {};
  223. this.s.authProviders[name] = provider;
  224. }
  225. /**
  226. * Name of BSON parser currently used
  227. * @method
  228. * @return {string}
  229. */
  230. ReplSet.prototype.parserType = function() {
  231. if(this.s.bson.serialize.toString().indexOf('[native code]') != -1)
  232. return 'c++';
  233. return 'js';
  234. }
  235. /**
  236. * Execute a command
  237. * @method
  238. * @param {string} type Type of BSON parser to use (c++ or js)
  239. */
  240. ReplSet.prototype.setBSONParserType = function(type) {
  241. var nBSON = null;
  242. if(type == 'c++') {
  243. nBSON = require('bson').native().BSON;
  244. } else if(type == 'js') {
  245. nBSON = require('bson').pure().BSON;
  246. } else {
  247. throw new MongoError(f("% parser not supported", type));
  248. }
  249. this.s.options.bson = new nBSON(bsonTypes);
  250. }
  251. /**
  252. * Returns the last known ismaster document for this server
  253. * @method
  254. * @return {object}
  255. */
  256. ReplSet.prototype.lastIsMaster = function() {
  257. return this.s.replState.lastIsMaster();
  258. }
  259. /**
  260. * Get connection
  261. * @method
  262. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  263. * @return {Connection}
  264. */
  265. ReplSet.prototype.getConnection = function(options) {
  266. // Ensure we have no options
  267. options = options || {};
  268. // Pick the right server based on readPreference
  269. var server = pickServer(this, this.s, options.readPreference);
  270. if(server == null) return null;
  271. // Return connection
  272. return server.getConnection();
  273. }
  274. /**
  275. * All raw connections
  276. * @method
  277. * @return {Connection[]}
  278. */
  279. ReplSet.prototype.connections = function() {
  280. return this.s.replState.getAllConnections();
  281. }
  282. /**
  283. * Get server
  284. * @method
  285. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  286. * @return {Server}
  287. */
  288. ReplSet.prototype.getServer = function(options) {
  289. // Ensure we have no options
  290. options = options || {};
  291. // Pick the right server based on readPreference
  292. return pickServer(this, this.s, options.readPreference);
  293. }
  294. /**
  295. * Perform one or more remove operations
  296. * @method
  297. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  298. * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
  299. * @param {object} [options.batchSize=0] Batchsize for the operation
  300. * @param {array} [options.documents=[]] Initial documents list for cursor
  301. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  302. * @param {opResultCallback} callback A callback function
  303. */
  304. ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
  305. cursorOptions = cursorOptions || {};
  306. var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
  307. return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
  308. }
  309. //
  310. // Execute write operation
  311. var executeWriteOperation = function(self, op, ns, ops, options, callback) {
  312. if(typeof options == 'function') {
  313. callback = options;
  314. options = {};
  315. }
  316. var server = null;
  317. // Ensure we have no options
  318. options = options || {};
  319. // Get a primary
  320. try {
  321. server = pickServer(self, self.s, ReadPreference.primary);
  322. if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
  323. } catch(err) {
  324. return callback(err);
  325. }
  326. // No server returned we had an error
  327. if(server == null) return callback(new MongoError("no server found"));
  328. // Execute the command
  329. server[op](ns, ops, options, function(err, r) {
  330. // We have a no master error, immediately refresh the view of the replicaset
  331. if(notMasterError(r) || notMasterError(err)) replicasetInquirer(self, self.s, true)();
  332. // Return the result
  333. callback(err, r);
  334. });
  335. }
  336. /**
  337. * Execute a command
  338. * @method
  339. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  340. * @param {object} cmd The command hash
  341. * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
  342. * @param {Connection} [options.connection] Specify connection object to execute command against
  343. * @param {opResultCallback} callback A callback function
  344. */
  345. ReplSet.prototype.command = function(ns, cmd, options, callback) {
  346. if(typeof options == 'function') callback = options, options = {};
  347. if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  348. var server = null;
  349. var self = this;
  350. // Ensure we have no options
  351. options = options || {};
  352. // Topology is not connected, save the call in the provided store to be
  353. // Executed at some point when the handler deems it's reconnected
  354. if(!this.isConnected(options) && this.s.disconnectHandler != null) {
  355. callback = bindToCurrentDomain(callback);
  356. return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
  357. }
  358. // We need to execute the command on all servers
  359. if(options.onAll) {
  360. var servers = this.s.replState.getAll();
  361. var count = servers.length;
  362. var cmdErr = null;
  363. for(var i = 0; i < servers.length; i++) {
  364. servers[i].command(ns, cmd, options, function(err, r) {
  365. count = count - 1;
  366. // Finished executing command
  367. if(count == 0) {
  368. // Was it a logout command clear any credentials
  369. if(cmd.logout) clearCredentials(self.s, ns);
  370. // We have a no master error, immediately refresh the view of the replicaset
  371. if(notMasterError(r) || notMasterError(err)) replicasetInquirer(self, self.s, true)();
  372. // Return the error
  373. callback(err, r);
  374. }
  375. });
  376. }
  377. return;
  378. }
  379. // Pick the right server based on readPreference
  380. try {
  381. server = pickServer(self, self.s, options.writeConcern ? ReadPreference.primary : options.readPreference);
  382. if(self.s.debug) self.emit('pickedServer', options.writeConcern ? ReadPreference.primary : options.readPreference, server);
  383. } catch(err) {
  384. return callback(err);
  385. }
  386. // No server returned we had an error
  387. if(server == null) return callback(new MongoError("no server found"));
  388. // Execute the command
  389. server.command(ns, cmd, options, function(err, r) {
  390. // Was it a logout command clear any credentials
  391. if(cmd.logout) clearCredentials(self.s, ns);
  392. // We have a no master error, immediately refresh the view of the replicaset
  393. if(notMasterError(r) || notMasterError(err)) {
  394. replicasetInquirer(self, self.s, true)();
  395. }
  396. // Return the error
  397. callback(err, r);
  398. });
  399. }
  400. /**
  401. * Perform one or more remove operations
  402. * @method
  403. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  404. * @param {array} ops An array of removes
  405. * @param {boolean} [options.ordered=true] Execute in order or out of order
  406. * @param {object} [options.writeConcern={}] Write concern for the operation
  407. * @param {opResultCallback} callback A callback function
  408. */
  409. ReplSet.prototype.remove = function(ns, ops, options, callback) {
  410. if(typeof options == 'function') callback = options, options = {};
  411. if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  412. // Topology is not connected, save the call in the provided store to be
  413. // Executed at some point when the handler deems it's reconnected
  414. if(!this.isConnected() && this.s.disconnectHandler != null) {
  415. callback = bindToCurrentDomain(callback);
  416. return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
  417. }
  418. executeWriteOperation(this, 'remove', ns, ops, options, callback);
  419. }
  420. /**
  421. * Insert one or more documents
  422. * @method
  423. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  424. * @param {array} ops An array of documents to insert
  425. * @param {boolean} [options.ordered=true] Execute in order or out of order
  426. * @param {object} [options.writeConcern={}] Write concern for the operation
  427. * @param {opResultCallback} callback A callback function
  428. */
  429. ReplSet.prototype.insert = function(ns, ops, options, callback) {
  430. if(typeof options == 'function') callback = options, options = {};
  431. if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  432. // Topology is not connected, save the call in the provided store to be
  433. // Executed at some point when the handler deems it's reconnected
  434. if(!this.isConnected() && this.s.disconnectHandler != null) {
  435. callback = bindToCurrentDomain(callback);
  436. return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
  437. }
  438. executeWriteOperation(this, 'insert', ns, ops, options, callback);
  439. }
  440. /**
  441. * Perform one or more update operations
  442. * @method
  443. * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
  444. * @param {array} ops An array of updates
  445. * @param {boolean} [options.ordered=true] Execute in order or out of order
  446. * @param {object} [options.writeConcern={}] Write concern for the operation
  447. * @param {opResultCallback} callback A callback function
  448. */
  449. ReplSet.prototype.update = function(ns, ops, options, callback) {
  450. if(typeof options == 'function') callback = options, options = {};
  451. if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
  452. // Topology is not connected, save the call in the provided store to be
  453. // Executed at some point when the handler deems it's reconnected
  454. if(!this.isConnected() && this.s.disconnectHandler != null) {
  455. callback = bindToCurrentDomain(callback);
  456. return this.s.disconnectHandler.add('update', ns, ops, options, callback);
  457. }
  458. executeWriteOperation(this, 'update', ns, ops, options, callback);
  459. }
  460. /**
  461. * Authenticate using a specified mechanism
  462. * @method
  463. * @param {string} mechanism The Auth mechanism we are invoking
  464. * @param {string} db The db we are invoking the mechanism against
  465. * @param {...object} param Parameters for the specific mechanism
  466. * @param {authResultCallback} callback A callback function
  467. */
  468. ReplSet.prototype.auth = function(mechanism, db) {
  469. var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
  470. var self = this;
  471. var args = Array.prototype.slice.call(arguments, 2);
  472. var callback = args.pop();
  473. // If we don't have the mechanism fail
  474. if(this.s.authProviders[mechanism] == null && mechanism != 'default')
  475. throw new MongoError(f("auth provider %s does not exist", mechanism));
  476. // Authenticate against all the servers
  477. var servers = this.s.replState.getAll().slice(0);
  478. var count = servers.length;
  479. // Correct authentication
  480. var authenticated = true;
  481. var authErr = null;
  482. // Set auth in progress
  483. this.s.authInProgress = true;
  484. // Authenticate against all servers
  485. while(servers.length > 0) {
  486. var server = servers.shift();
  487. // Arguments without a callback
  488. var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
  489. // Create arguments
  490. var finalArguments = argsWithoutCallback.concat([function(err, r) {
  491. count = count - 1;
  492. if(err) authErr = err;
  493. if(!r) authenticated = false;
  494. // We are done
  495. if(count == 0) {
  496. // We have more servers that are not authenticated, let's authenticate
  497. if(self.s.authInProgressServers.length > 0) {
  498. self.s.authInProgressServers = [];
  499. return self.auth.apply(self, [mechanism, db].concat(args).concat([callback]));
  500. }
  501. // Auth is done
  502. self.s.authInProgress = false;
  503. // Add successful credentials
  504. if(authErr == null) addCredentials(self.s, db, argsWithoutCallback);
  505. // Return the auth error
  506. if(authErr) return callback(authErr, false);
  507. // Successfully authenticated session
  508. callback(null, new Session({}, self));
  509. }
  510. }]);
  511. // Execute the auth
  512. server.auth.apply(server, finalArguments);
  513. }
  514. }
  515. ReplSet.prototype.state = function() {
  516. return this.s.replState.state;
  517. }
  518. /**
  519. * Ensure single socket connections to arbiters and hidden servers
  520. * @method
  521. */
  522. var handleIsmaster = function(self) {
  523. return function(ismaster, _server) {
  524. if(ismaster.arbiterOnly) {
  525. _server.s.options.size = 1;
  526. } else if(ismaster.hidden) {
  527. _server.s.options.size = 1;
  528. }
  529. }
  530. }
  531. /**
  532. * Initiate server connect
  533. * @method
  534. */
  535. ReplSet.prototype.connect = function(_options) {
  536. var self = this;
  537. // Start replicaset inquiry process
  538. setTimeout(replicasetInquirer(this, this.s, false), this.s.haInterval);
  539. // Additional options
  540. if(_options) for(var name in _options) this.s.options[name] = _options[name];
  541. // Set the state as connecting
  542. this.s.replState.state = CONNECTING;
  543. // No fullsetup reached
  544. this.s.fullsetup = false;
  545. // For all entries in the seedlist build a server instance
  546. this.s.seedlist.forEach(function(e) {
  547. // Clone options
  548. var opts = cloneOptions(self.s.options);
  549. // Add host and port
  550. opts.host = e.host;
  551. opts.port = e.port;
  552. opts.reconnect = false;
  553. opts.readPreferenceStrategies = self.s.readPreferenceStrategies;
  554. opts.emitError = true;
  555. if(self.s.tag) opts.tag = self.s.tag;
  556. // Share the auth store
  557. opts.authProviders = self.s.authProviders;
  558. // Create a new Server
  559. var server = new Server(opts);
  560. // Handle the ismaster
  561. server.on('ismaster', handleIsmaster(self));
  562. // Add to list of disconnected servers
  563. self.s.disconnectedServers.push(server);
  564. // Add to list of inflight Connections
  565. self.s.initialConnectionServers.push(server);
  566. });
  567. // Attempt to connect to all the servers
  568. while(this.s.disconnectedServers.length > 0) {
  569. // Get the server
  570. var server = this.s.disconnectedServers.shift();
  571. // Set up the event handlers
  572. server.once('error', errorHandlerTemp(this, this.s, 'error'));
  573. server.once('close', errorHandlerTemp(this, this.s, 'close'));
  574. server.once('timeout', errorHandlerTemp(this, this.s, 'timeout'));
  575. server.once('connect', connectHandler(this, this.s));
  576. // Attempt to connect
  577. server.connect();
  578. }
  579. }
  580. /**
  581. * Figure out if the server is connected
  582. * @method
  583. * @return {boolean}
  584. */
  585. ReplSet.prototype.isConnected = function(options) {
  586. options = options || {};
  587. // If we specified a read preference check if we are connected to something
  588. // than can satisfy this
  589. if(options.readPreference
  590. && options.readPreference.equals(ReadPreference.secondary))
  591. return this.s.replState.isSecondaryConnected();
  592. if(options.readPreference
  593. && options.readPreference.equals(ReadPreference.primary))
  594. return this.s.replState.isSecondaryConnected() || this.s.replState.isPrimaryConnected();
  595. if(this.s.secondaryOnlyConnectionAllowed) return this.s.replState.isSecondaryConnected();
  596. return this.s.replState.isPrimaryConnected();
  597. }
  598. /**
  599. * Figure out if the replicaset instance was destroyed by calling destroy
  600. * @method
  601. * @return {boolean}
  602. */
  603. ReplSet.prototype.isDestroyed = function() {
  604. return this.s.replState.state == DESTROYED;
  605. }
  606. /**
  607. * Destroy the server connection
  608. * @method
  609. */
  610. ReplSet.prototype.destroy = function(emitClose) {
  611. var self = this;
  612. if(this.s.logger.isInfo()) this.s.logger.info(f('[%s] destroyed', this.s.id));
  613. this.s.replState.state = DESTROYED;
  614. // Emit close
  615. if(emitClose && self.listeners('close').length > 0) self.emit('close', self);
  616. // Destroy state
  617. this.s.replState.destroy();
  618. // Clear out any listeners
  619. var events = ['timeout', 'error', 'close', 'joined', 'left'];
  620. events.forEach(function(e) {
  621. self.removeAllListeners(e);
  622. });
  623. }
  624. /**
  625. * A replset connect event, used to verify that the connection is up and running
  626. *
  627. * @event ReplSet#connect
  628. * @type {ReplSet}
  629. */
  630. /**
  631. * The replset high availability event
  632. *
  633. * @event ReplSet#ha
  634. * @type {function}
  635. * @param {string} type The stage in the high availability event (start|end)
  636. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  637. * @param {number} data.id The id for this high availability request
  638. * @param {object} data.state An object containing the information about the current replicaset
  639. */
  640. /**
  641. * A server member left the replicaset
  642. *
  643. * @event ReplSet#left
  644. * @type {function}
  645. * @param {string} type The type of member that left (primary|secondary|arbiter)
  646. * @param {Server} server The server object that left
  647. */
  648. /**
  649. * A server member joined the replicaset
  650. *
  651. * @event ReplSet#joined
  652. * @type {function}
  653. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  654. * @param {Server} server The server object that joined
  655. */
  656. //
  657. // Inquires about state changes
  658. //
  659. // Add the new credential for a db, removing the old
  660. // credential from the cache
  661. var addCredentials = function(s, db, argsWithoutCallback) {
  662. // Remove any credentials for the db
  663. clearCredentials(s, db + ".dummy");
  664. // Add new credentials to list
  665. s.credentials.push(argsWithoutCallback);
  666. }
  667. // Clear out credentials for a namespace
  668. var clearCredentials = function(s, ns) {
  669. var db = ns.split('.')[0];
  670. var filteredCredentials = [];
  671. // Filter out all credentials for the db the user is logging out off
  672. for(var i = 0; i < s.credentials.length; i++) {
  673. if(s.credentials[i][1] != db) filteredCredentials.push(s.credentials[i]);
  674. }
  675. // Set new list of credentials
  676. s.credentials = filteredCredentials;
  677. }
  678. //
  679. // Filter serves by tags
  680. var filterByTags = function(readPreference, servers) {
  681. if(readPreference.tags == null) return servers;
  682. var filteredServers = [];
  683. var tags = readPreference.tags;
  684. // Iterate over all the servers
  685. for(var i = 0; i < servers.length; i++) {
  686. var serverTag = servers[i].lastIsMaster().tags || {};
  687. // Did we find the a matching server
  688. var found = true;
  689. // Check if the server is valid
  690. for(var name in tags) {
  691. if(serverTag[name] != tags[name]) found = false;
  692. }
  693. // Add to candidate list
  694. if(found) filteredServers.push(servers[i]);
  695. }
  696. // Returned filtered servers
  697. return filteredServers;
  698. }
  699. //
  700. // Pick a server based on readPreference
  701. var pickServer = function(self, s, readPreference) {
  702. // If no read Preference set to primary by default
  703. readPreference = readPreference || ReadPreference.primary;
  704. // Do we have a custom readPreference strategy, use it
  705. if(s.readPreferenceStrategies != null && s.readPreferenceStrategies[readPreference.preference] != null) {
  706. if(s.readPreferenceStrategies[readPreference.preference] == null) throw new MongoError(f("cannot locate read preference handler for %s", readPreference.preference));
  707. var server = s.readPreferenceStrategies[readPreference.preference].pickServer(s.replState, readPreference);
  708. if(s.debug) self.emit('pickedServer', readPreference, server);
  709. return server;
  710. }
  711. // Filter out any hidden secondaries
  712. var secondaries = s.replState.secondaries.filter(function(server) {
  713. if(server.lastIsMaster().hidden) return false;
  714. return true;
  715. });
  716. // Check if we can satisfy and of the basic read Preferences
  717. if(readPreference.equals(ReadPreference.secondary)
  718. && secondaries.length == 0)
  719. throw new MongoError("no secondary server available");
  720. if(readPreference.equals(ReadPreference.secondaryPreferred)
  721. && secondaries.length == 0
  722. && s.replState.primary == null)
  723. throw new MongoError("no secondary or primary server available");
  724. if(readPreference.equals(ReadPreference.primary)
  725. && s.replState.primary == null)
  726. throw new MongoError("no primary server available");
  727. // Secondary
  728. if(readPreference.equals(ReadPreference.secondary)) {
  729. s.index = (s.index + 1) % secondaries.length;
  730. return secondaries[s.index];
  731. }
  732. // Secondary preferred
  733. if(readPreference.equals(ReadPreference.secondaryPreferred)) {
  734. if(secondaries.length > 0) {
  735. // Apply tags if present
  736. var servers = filterByTags(readPreference, secondaries);
  737. // If have a matching server pick one otherwise fall through to primary
  738. if(servers.length > 0) {
  739. s.index = (s.index + 1) % servers.length;
  740. return servers[s.index];
  741. }
  742. }
  743. return s.replState.primary;
  744. }
  745. // Primary preferred
  746. if(readPreference.equals(ReadPreference.primaryPreferred)) {
  747. if(s.replState.primary) return s.replState.primary;
  748. if(secondaries.length > 0) {
  749. // Apply tags if present
  750. var servers = filterByTags(readPreference, secondaries);
  751. // If have a matching server pick one otherwise fall through to primary
  752. if(servers.length > 0) {
  753. s.index = (s.index + 1) % servers.length;
  754. return servers[s.index];
  755. }
  756. // Throw error a we have not valid secondary or primary servers
  757. throw new MongoError("no secondary or primary server available");
  758. }
  759. }
  760. // Return the primary
  761. return s.replState.primary;
  762. }
  763. var replicasetInquirer = function(self, state, norepeat) {
  764. return function() {
  765. if(state.replState.state == DESTROYED) return
  766. // Process already running don't rerun
  767. if(state.highAvailabilityProcessRunning) return;
  768. // Started processes
  769. state.highAvailabilityProcessRunning = true;
  770. if(state.logger.isInfo()) state.logger.info(f('[%s] monitoring process running %s', state.id, JSON.stringify(state.replState)));
  771. // Unique HA id to identify the current look running
  772. var localHaId = state.haId++;
  773. // Clean out any failed connection attempts
  774. state.connectingServers = {};
  775. // Controls if we are doing a single inquiry or repeating
  776. norepeat = typeof norepeat == 'boolean' ? norepeat : false;
  777. // If we have a primary and a disconnect handler, execute
  778. // buffered operations
  779. if(state.replState.isPrimaryConnected() && state.replState.isSecondaryConnected() && state.disconnectHandler) {
  780. state.disconnectHandler.execute();
  781. }
  782. // Emit replicasetInquirer
  783. self.emit('ha', 'start', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
  784. // Let's process all the disconnected servers
  785. while(state.disconnectedServers.length > 0) {
  786. // Get the first disconnected server
  787. var server = state.disconnectedServers.shift();
  788. if(state.logger.isInfo()) state.logger.info(f('[%s] monitoring attempting to connect to %s', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
  789. // Set up the event handlers
  790. server.once('error', errorHandlerTemp(self, state, 'error'));
  791. server.once('close', errorHandlerTemp(self, state, 'close'));
  792. server.once('timeout', errorHandlerTemp(self, state, 'timeout'));
  793. server.once('connect', connectHandler(self, state));
  794. // Attempt to connect
  795. server.connect();
  796. }
  797. // Cleanup state (removed disconnected servers)
  798. state.replState.clean();
  799. // We need to query all servers
  800. var servers = state.replState.getAll();
  801. var serversLeft = servers.length;
  802. // If no servers and we are not destroyed keep pinging
  803. if(servers.length == 0 && state.replState.state == CONNECTED) {
  804. // Emit ha process end
  805. self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
  806. // Ended highAvailabilityProcessRunning
  807. state.highAvailabilityProcessRunning = false;
  808. // Restart ha process
  809. if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
  810. return;
  811. }
  812. //
  813. // ismaster for Master server
  814. var primaryIsMaster = null;
  815. //
  816. // Inspect a specific servers ismaster
  817. var inspectServer = function(server) {
  818. if(state.replState.state == DESTROYED) return;
  819. // Did we get a server
  820. if(server && server.isConnected()) {
  821. // Execute ismaster
  822. server.command('system.$cmd', {ismaster:true}, function(err, r) {
  823. if(state.replState.state == DESTROYED) return;
  824. // Count down the number of servers left
  825. serversLeft = serversLeft - 1;
  826. // If we have an error but still outstanding server request return
  827. if(err && serversLeft > 0) return;
  828. // We had an error and have no more servers to inspect, schedule a new check
  829. if(err && serversLeft == 0) {
  830. self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
  831. // Ended highAvailabilityProcessRunnfing
  832. state.highAvailabilityProcessRunning = false;
  833. // Return the replicasetInquirer
  834. if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
  835. return;
  836. }
  837. // Let all the read Preferences do things to the servers
  838. var rPreferencesCount = Object.keys(state.readPreferenceStrategies).length;
  839. // Handle the primary
  840. var ismaster = r.result;
  841. if(state.logger.isDebug()) state.logger.debug(f('[%s] monitoring process ismaster %s', state.id, JSON.stringify(ismaster)));
  842. // Update the replicaset state
  843. state.replState.update(ismaster, server);
  844. // Add any new servers
  845. if(err == null && ismaster.ismaster && Array.isArray(ismaster.hosts)) {
  846. // Hosts to process
  847. var hosts = ismaster.hosts;
  848. // Add arbiters to list of hosts if we have any
  849. if(Array.isArray(ismaster.arbiters)) hosts = hosts.concat(ismaster.arbiters);
  850. if(Array.isArray(ismaster.passives)) hosts = hosts.concat(ismaster.passives);
  851. // Process all the hsots
  852. processHosts(self, state, hosts);
  853. }
  854. // No read Preferences strategies
  855. if(rPreferencesCount == 0) {
  856. // Don't schedule a new inquiry
  857. if(serversLeft > 0) return;
  858. // Emit ha process end
  859. self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
  860. // Ended highAvailabilityProcessRunning
  861. state.highAvailabilityProcessRunning = false;
  862. // Let's keep monitoring
  863. if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
  864. return;
  865. }
  866. // No servers left to query, execute read preference strategies
  867. if(serversLeft == 0) {
  868. // Go over all the read preferences
  869. for(var name in state.readPreferenceStrategies) {
  870. state.readPreferenceStrategies[name].ha(self, state.replState, function() {
  871. rPreferencesCount = rPreferencesCount - 1;
  872. if(rPreferencesCount == 0) {
  873. // Add any new servers in primary ismaster
  874. if(err == null
  875. && ismaster.ismaster
  876. && Array.isArray(ismaster.hosts)) {
  877. processHosts(self, state, ismaster.hosts);
  878. }
  879. // Emit ha process end
  880. self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
  881. // Ended highAvailabilityProcessRunning
  882. state.highAvailabilityProcessRunning = false;
  883. // Let's keep monitoring
  884. if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
  885. return;
  886. }
  887. });
  888. }
  889. }
  890. });
  891. }
  892. }
  893. // Call ismaster on all servers
  894. for(var i = 0; i < servers.length; i++) {
  895. inspectServer(servers[i]);
  896. }
  897. // If no more initial servers and new scheduled servers to connect
  898. if(state.replState.secondaries.length >= 1 && state.replState.primary != null && !state.fullsetup) {
  899. state.fullsetup = true;
  900. self.emit('fullsetup', self);
  901. }
  902. // If all servers are accounted for and we have not sent the all event
  903. if(state.replState.primary != null && self.lastIsMaster()
  904. && Array.isArray(self.lastIsMaster().hosts) && !state.all) {
  905. var length = 1 + state.replState.secondaries.length;
  906. // If we have all secondaries + primary
  907. if(length == self.lastIsMaster().hosts.length + 1) {
  908. state.all = true;
  909. self.emit('all', self);
  910. }
  911. }
  912. }
  913. }
  914. // Error handler for initial connect
  915. var errorHandlerTemp = function(self, state, event) {
  916. return function(err, server) {
  917. // Log the information
  918. if(state.logger.isInfo()) state.logger.info(f('[%s] server %s disconnected', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
  919. // Filter out any connection servers
  920. state.initialConnectionServers = state.initialConnectionServers.filter(function(_server) {
  921. return server.name != _server.name;
  922. });
  923. // Connection is destroyed, ignore
  924. if(state.replState.state == DESTROYED) return;
  925. // Remove any non used handlers
  926. ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
  927. server.removeAllListeners(e);
  928. })
  929. // Push to list of disconnected servers
  930. addToListIfNotExist(state.disconnectedServers, server);
  931. // End connection operation if we have no legal replicaset state
  932. if(state.initialConnectionServers == 0 && state.replState.state == CONNECTING) {
  933. if((state.secondaryOnlyConnectionAllowed && !state.replState.isSecondaryConnected() && !state.replState.isPrimaryConnected())
  934. || (!state.secondaryOnlyConnectionAllowed && !state.replState.isPrimaryConnected())) {
  935. if(state.logger.isInfo()) state.logger.info(f('[%s] no valid seed servers in list', state.id));
  936. if(self.listeners('error').length > 0)
  937. return self.emit('error', new MongoError('no valid seed servers in list'));
  938. }
  939. }
  940. // If the number of disconnected servers is equal to
  941. // the number of seed servers we cannot connect
  942. if(state.disconnectedServers.length == state.seedlist.length && state.replState.state == CONNECTING) {
  943. if(state.emitError && self.listeners('error').length > 0) {
  944. if(state.logger.isInfo()) state.logger.info(f('[%s] no valid seed servers in list', state.id));
  945. if(self.listeners('error').length > 0)
  946. self.emit('error', new MongoError('no valid seed servers in list'));
  947. }
  948. }
  949. }
  950. }
  951. // Connect handler
  952. var connectHandler = function(self, state) {
  953. return function(server) {
  954. if(state.logger.isInfo()) state.logger.info(f('[%s] connected to %s', state.id, server.name));
  955. // Destroyed connection
  956. if(state.replState.state == DESTROYED) {
  957. server.destroy(false, false);
  958. return;
  959. }
  960. // Filter out any connection servers
  961. state.initialConnectionServers = state.initialConnectionServers.filter(function(_server) {
  962. return server.name != _server.name;
  963. });
  964. // Process the new server
  965. var processNewServer = function() {
  966. // Discover any additional servers
  967. var ismaster = server.lastIsMaster();
  968. var events = ['error', 'close', 'timeout', 'connect', 'message'];
  969. // Remove any non used handlers
  970. events.forEach(function(e) {
  971. server.removeAllListeners(e);
  972. })
  973. // Clean up
  974. delete state.connectingServers[server.name];
  975. // Update the replicaset state, destroy if not added
  976. if(!state.replState.update(ismaster, server)) {
  977. return server.destroy();
  978. }
  979. // Add the server handling code
  980. if(server.isConnected()) {
  981. server.on('error', errorHandler(self, state));
  982. server.on('close', closeHandler(self, state));
  983. server.on('timeout', timeoutHandler(self, state));
  984. }
  985. // Hosts to process
  986. var hosts = ismaster.hosts;
  987. // Add arbiters to list of hosts if we have any
  988. if(Array.isArray(ismaster.arbiters)) hosts = hosts.concat(ismaster.arbiters);
  989. if(Array.isArray(ismaster.passives)) hosts = hosts.concat(ismaster.passives);
  990. // Add any new servers
  991. processHosts(self, state, hosts);
  992. // If have the server instance already destroy it
  993. if(state.initialConnectionServers.length == 0 && Object.keys(state.connectingServers).length == 0
  994. && !state.replState.isPrimaryConnected() && !state.secondaryOnlyConnectionAllowed && state.replState.state == CONNECTING) {
  995. if(state.logger.isInfo()) state.logger.info(f('[%s] no primary found in replicaset', state.id));
  996. self.emit('error', new MongoError("no primary found in replicaset"));
  997. return self.destroy();
  998. }
  999. // If no more initial servers and new scheduled servers to connect
  1000. if(state.replState.secondaries.length >= 1 && state.replState.primary != null && !state.fullsetup) {
  1001. state.fullsetup = true;
  1002. self.emit('fullsetup', self);
  1003. }
  1004. }
  1005. // Save up new members to be authenticated against
  1006. if(self.s.authInProgress) {
  1007. self.s.authInProgressServers.push(server);
  1008. }
  1009. // No credentials just process server
  1010. if(state.credentials.length == 0) return processNewServer();
  1011. // Do we have credentials, let's apply them all
  1012. var count = state.credentials.length;
  1013. // Apply the credentials
  1014. for(var i = 0; i < state.credentials.length; i++) {
  1015. server.auth.apply(server, state.credentials[i].concat([function(err, r) {
  1016. count = count - 1;
  1017. if(count == 0) processNewServer();
  1018. }]));
  1019. }
  1020. }
  1021. }
  1022. //
  1023. // Detect if we need to add new servers
  1024. var processHosts = function(self, state, hosts) {
  1025. if(state.replState.state == DESTROYED) return;
  1026. if(Array.isArray(hosts)) {
  1027. // Check any hosts exposed by ismaster
  1028. for(var i = 0; i < hosts.length; i++) {
  1029. // If not found we need to create a new connection
  1030. if(!state.replState.contains(hosts[i])) {
  1031. if(state.connectingServers[hosts[i]] == null && !inInitialConnectingServers(self, state, hosts[i])) {
  1032. if(state.logger.isInfo()) state.logger.info(f('[%s] scheduled server %s for connection', state.id, hosts[i]));
  1033. // Make sure we know what is trying to connect
  1034. state.connectingServers[hosts[i]] = hosts[i];
  1035. // Connect the server
  1036. connectToServer(self, state, hosts[i].split(':')[0], parseInt(hosts[i].split(':')[1], 10));
  1037. }
  1038. }
  1039. }
  1040. }
  1041. }
  1042. var inInitialConnectingServers = function(self, state, address) {
  1043. for(var i = 0; i < state.initialConnectionServers.length; i++) {
  1044. if(state.initialConnectionServers[i].name == address) return true;
  1045. }
  1046. return false;
  1047. }
  1048. // Connect to a new server
  1049. var connectToServer = function(self, state, host, port) {
  1050. var opts = cloneOptions(state.options);
  1051. opts.host = host;
  1052. opts.port = port;
  1053. opts.reconnect = false;
  1054. opts.readPreferenceStrategies = state.readPreferenceStrategies;
  1055. if(state.tag) opts.tag = state.tag;
  1056. // Share the auth store
  1057. opts.authProviders = state.authProviders;
  1058. opts.emitError = true;
  1059. // Create a new server instance
  1060. var server = new Server(opts);
  1061. // Handle the ismaster
  1062. server.on('ismaster', handleIsmaster(self));
  1063. // Set up the event handlers
  1064. server.once('error', errorHandlerTemp(self, state, 'error'));
  1065. server.once('close', errorHandlerTemp(self, state, 'close'));
  1066. server.once('timeout', errorHandlerTemp(self, state, 'timeout'));
  1067. server.once('connect', connectHandler(self, state));
  1068. // Attempt to connect
  1069. server.connect();
  1070. }
  1071. //
  1072. // Add server to the list if it does not exist
  1073. var addToListIfNotExist = function(list, server) {
  1074. var found = false;
  1075. // Remove any non used handlers
  1076. ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
  1077. server.removeAllListeners(e);
  1078. })
  1079. // Check if the server already exists
  1080. for(var i = 0; i < list.length; i++) {
  1081. if(list[i].equals(server)) found = true;
  1082. }
  1083. if(!found) {
  1084. list.push(server);
  1085. }
  1086. return found;
  1087. }
  1088. var errorHandler = function(self, state) {
  1089. return function(err, server) {
  1090. if(state.replState.state == DESTROYED) return;
  1091. if(state.logger.isInfo()) state.logger.info(f('[%s] server %s errored out with %s', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name, JSON.stringify(err)));
  1092. var found = addToListIfNotExist(state.disconnectedServers, server);
  1093. if(!found) self.emit('left', state.replState.remove(server), server);
  1094. if(found && state.emitError && self.listeners('error').length > 0) self.emit('error', err, server);
  1095. // Fire off a detection of missing server using minHeartbeatFrequencyMS
  1096. setTimeout(function() {
  1097. replicasetInquirer(self, self.s, true)();
  1098. }, self.s.minHeartbeatFrequencyMS);
  1099. }
  1100. }
  1101. var timeoutHandler = function(self, state) {
  1102. return function(err, server) {
  1103. if(state.replState.state == DESTROYED) return;
  1104. if(state.logger.isInfo()) state.logger.info(f('[%s] server %s timed out', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
  1105. var found = addToListIfNotExist(state.disconnectedServers, server);
  1106. if(!found) self.emit('left', state.replState.remove(server), server);
  1107. // Fire off a detection of missing server using minHeartbeatFrequencyMS
  1108. setTimeout(function() {
  1109. replicasetInquirer(self, self.s, true)();
  1110. }, self.s.minHeartbeatFrequencyMS);
  1111. }
  1112. }
  1113. var closeHandler = function(self, state) {
  1114. return function(err, server) {
  1115. if(state.replState.state == DESTROYED) return;
  1116. if(state.logger.isInfo()) state.logger.info(f('[%s] server %s closed', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
  1117. var found = addToListIfNotExist(state.disconnectedServers, server);
  1118. if(!found) self.emit('left', state.replState.remove(server), server);
  1119. // Fire off a detection of missing server using minHeartbeatFrequencyMS
  1120. setTimeout(function() {
  1121. replicasetInquirer(self, self.s, true)();
  1122. }, self.s.minHeartbeatFrequencyMS);
  1123. }
  1124. }
  1125. //
  1126. // Validate if a non-master or recovering error
  1127. var notMasterError = function(r) {
  1128. // Get result of any
  1129. var result = r && r.result ? r.result : r;
  1130. // Explore if we have a not master error
  1131. if(result && (result.err == 'not master'
  1132. || result.errmsg == 'not master' || (result['$err'] && result['$err'].indexOf('not master or secondary') != -1)
  1133. || (result['$err'] && result['$err'].indexOf("not master and slaveOk=false") != -1)
  1134. || result.errmsg == 'node is recovering')) {
  1135. return true;
  1136. }
  1137. return false;
  1138. }
  1139. module.exports = ReplSet;