12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304 |
- "use strict";
- var inherits = require('util').inherits
- , f = require('util').format
- , b = require('bson')
- , bindToCurrentDomain = require('../connection/utils').bindToCurrentDomain
- , debugOptions = require('../connection/utils').debugOptions
- , EventEmitter = require('events').EventEmitter
- , Server = require('./server')
- , ReadPreference = require('./read_preference')
- , MongoError = require('../error')
- , Ping = require('./strategies/ping')
- , Session = require('./session')
- , BasicCursor = require('../cursor')
- , BSON = require('bson').native().BSON
- , State = require('./replset_state')
- , Logger = require('../connection/logger');
- /**
- * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
- * used to construct connecctions.
- *
- * @example
- * var ReplSet = require('mongodb-core').ReplSet
- * , ReadPreference = require('mongodb-core').ReadPreference
- * , assert = require('assert');
- *
- * var server = new ReplSet([{host: 'localhost', port: 30000}], {setName: 'rs'});
- * // Wait for the connection event
- * server.on('connect', function(server) {
- * server.destroy();
- * });
- *
- * // Start connecting
- * server.connect();
- */
- var DISCONNECTED = 'disconnected';
- var CONNECTING = 'connecting';
- var CONNECTED = 'connected';
- var DESTROYED = 'destroyed';
- //
- // ReplSet instance id
- var replSetId = 1;
- //
- // Clone the options
- var cloneOptions = function(options) {
- var opts = {};
- for(var name in options) {
- opts[name] = options[name];
- }
- return opts;
- }
- // All bson types
- var bsonTypes = [b.Long, b.ObjectID, b.Binary, b.Code, b.DBRef, b.Symbol, b.Double, b.Timestamp, b.MaxKey, b.MinKey];
- // BSON parser
- var bsonInstance = null;
- /**
- * Creates a new Replset instance
- * @class
- * @param {array} seedlist A list of seeds for the replicaset
- * @param {boolean} options.setName The Replicaset set name
- * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
- * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
- * @param {boolean} [options.emitError=false] Server will emit errors events
- * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
- * @param {number} [options.size=5] Server connection pool size
- * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
- * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
- * @param {boolean} [options.noDelay=true] TCP Connection no delay
- * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
- * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
- * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
- * @param {boolean} [options.ssl=false] Use SSL for connection
- * @param {Buffer} [options.ca] SSL Certificate store binary buffer
- * @param {Buffer} [options.cert] SSL Certificate binary buffer
- * @param {Buffer} [options.key] SSL Key file binary buffer
- * @param {string} [options.passphrase] SSL Certificate pass phrase
- * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
- * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
- * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
- * @param {number} [options.acceptableLatency=250] Acceptable latency for selecting a server for reading (in milliseconds)
- * @return {ReplSet} A cursor instance
- * @fires ReplSet#connect
- * @fires ReplSet#ha
- * @fires ReplSet#joined
- * @fires ReplSet#left
- */
- var ReplSet = function(seedlist, options) {
- var self = this;
- options = options || {};
- // Validate seedlist
- if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
- // Validate list
- if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
- // Validate entries
- seedlist.forEach(function(e) {
- if(typeof e.host != 'string' || typeof e.port != 'number')
- throw new MongoError("seedlist entry must contain a host and port");
- });
- // Add event listener
- EventEmitter.call(this);
- // Set the bson instance
- bsonInstance = bsonInstance == null ? new BSON(bsonTypes) : bsonInstance;
- // Internal state hash for the object
- this.s = {
- options: options
- // Logger instance
- , logger: Logger('ReplSet', options)
- // Uniquely identify the replicaset instance
- , id: replSetId++
- // Index
- , index: 0
- // Ha Index
- , haId: 0
- // Current credentials used for auth
- , credentials: []
- // Factory overrides
- , Cursor: options.cursorFactory || BasicCursor
- // BSON Parser, ensure we have a single instance
- , bsonInstance: bsonInstance
- // Pick the right bson parser
- , bson: options.bson ? options.bson : bsonInstance
- // Special replicaset options
- , secondaryOnlyConnectionAllowed: typeof options.secondaryOnlyConnectionAllowed == 'boolean'
- ? options.secondaryOnlyConnectionAllowed : false
- , haInterval: options.haInterval || 10000
- // Are we running in debug mode
- , debug: typeof options.debug == 'boolean' ? options.debug : false
- // The replicaset name
- , setName: options.setName
- // Swallow or emit errors
- , emitError: typeof options.emitError == 'boolean' ? options.emitError : false
- // Grouping tag used for debugging purposes
- , tag: options.tag
- // Do we have a not connected handler
- , disconnectHandler: options.disconnectHandler
- // Currently connecting servers
- , connectingServers: {}
- // Contains any alternate strategies for picking
- , readPreferenceStrategies: {}
- // Auth providers
- , authProviders: {}
- // All the servers
- , disconnectedServers: []
- // Initial connection servers
- , initialConnectionServers: []
- // High availability process running
- , highAvailabilityProcessRunning: false
- // Full setup
- , fullsetup: false
- // All servers accounted for (used for testing)
- , all: false
- // Seedlist
- , seedlist: seedlist
- // Authentication in progress
- , authInProgress: false
- // Servers added while auth in progress
- , authInProgressServers: []
- // Minimum heartbeat frequency used if we detect a server close
- , minHeartbeatFrequencyMS: 500
- }
- // Add bson parser to options
- options.bson = this.s.bson;
- // Set up the connection timeout for the options
- options.connectionTimeout = options.connectionTimeout || 10000;
- // Replicaset state
- var replState = new State(this, {
- id: this.s.id, setName: this.s.setName
- , connectingServers: this.s.connectingServers
- , secondaryOnlyConnectionAllowed: this.s.secondaryOnlyConnectionAllowed
- });
- // Add Replicaset state to our internal state
- this.s.replState = replState;
- // BSON property (find a server and pass it along)
- Object.defineProperty(this, 'bson', {
- enumerable: true, get: function() {
- var servers = self.s.replState.getAll();
- return servers.length > 0 ? servers[0].bson : null;
- }
- });
- Object.defineProperty(this, 'id', {
- enumerable:true, get: function() { return self.s.id; }
- });
- Object.defineProperty(this, 'haInterval', {
- enumerable:true, get: function() { return self.s.haInterval; }
- });
- Object.defineProperty(this, 'state', {
- enumerable:true, get: function() { return self.s.replState; }
- });
- //
- // Debug options
- if(self.s.debug) {
- // Add access to the read Preference Strategies
- Object.defineProperty(this, 'readPreferenceStrategies', {
- enumerable: true, get: function() { return self.s.readPreferenceStrategies; }
- });
- }
- Object.defineProperty(this, 'type', {
- enumerable:true, get: function() { return 'replset'; }
- });
- // Add the ping strategy for nearest
- this.addReadPreferenceStrategy('nearest', new Ping(options));
- }
- inherits(ReplSet, EventEmitter);
- //
- // Plugin methods
- //
- /**
- * Add custom read preference strategy
- * @method
- * @param {string} name Name of the read preference strategy
- * @param {object} strategy Strategy object instance
- */
- ReplSet.prototype.addReadPreferenceStrategy = function(name, func) {
- this.s.readPreferenceStrategies[name] = func;
- }
- /**
- * Add custom authentication mechanism
- * @method
- * @param {string} name Name of the authentication mechanism
- * @param {object} provider Authentication object instance
- */
- ReplSet.prototype.addAuthProvider = function(name, provider) {
- if(this.s.authProviders == null) this.s.authProviders = {};
- this.s.authProviders[name] = provider;
- }
- /**
- * Name of BSON parser currently used
- * @method
- * @return {string}
- */
- ReplSet.prototype.parserType = function() {
- if(this.s.bson.serialize.toString().indexOf('[native code]') != -1)
- return 'c++';
- return 'js';
- }
- /**
- * Execute a command
- * @method
- * @param {string} type Type of BSON parser to use (c++ or js)
- */
- ReplSet.prototype.setBSONParserType = function(type) {
- var nBSON = null;
- if(type == 'c++') {
- nBSON = require('bson').native().BSON;
- } else if(type == 'js') {
- nBSON = require('bson').pure().BSON;
- } else {
- throw new MongoError(f("% parser not supported", type));
- }
- this.s.options.bson = new nBSON(bsonTypes);
- }
- /**
- * Returns the last known ismaster document for this server
- * @method
- * @return {object}
- */
- ReplSet.prototype.lastIsMaster = function() {
- return this.s.replState.lastIsMaster();
- }
- /**
- * Get connection
- * @method
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @return {Connection}
- */
- ReplSet.prototype.getConnection = function(options) {
- // Ensure we have no options
- options = options || {};
- // Pick the right server based on readPreference
- var server = pickServer(this, this.s, options.readPreference);
- if(server == null) return null;
- // Return connection
- return server.getConnection();
- }
- /**
- * All raw connections
- * @method
- * @return {Connection[]}
- */
- ReplSet.prototype.connections = function() {
- return this.s.replState.getAllConnections();
- }
- /**
- * Get server
- * @method
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @return {Server}
- */
- ReplSet.prototype.getServer = function(options) {
- // Ensure we have no options
- options = options || {};
- // Pick the right server based on readPreference
- return pickServer(this, this.s, options.readPreference);
- }
- /**
- * Perform one or more remove operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
- * @param {object} [options.batchSize=0] Batchsize for the operation
- * @param {array} [options.documents=[]] Initial documents list for cursor
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @param {opResultCallback} callback A callback function
- */
- ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
- cursorOptions = cursorOptions || {};
- var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
- return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
- }
- //
- // Execute write operation
- var executeWriteOperation = function(self, op, ns, ops, options, callback) {
- if(typeof options == 'function') {
- callback = options;
- options = {};
- }
- var server = null;
- // Ensure we have no options
- options = options || {};
- // Get a primary
- try {
- server = pickServer(self, self.s, ReadPreference.primary);
- if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
- } catch(err) {
- return callback(err);
- }
- // No server returned we had an error
- if(server == null) return callback(new MongoError("no server found"));
- // Execute the command
- server[op](ns, ops, options, function(err, r) {
- // We have a no master error, immediately refresh the view of the replicaset
- if(notMasterError(r) || notMasterError(err)) replicasetInquirer(self, self.s, true)();
- // Return the result
- callback(err, r);
- });
- }
- /**
- * Execute a command
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {object} cmd The command hash
- * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
- * @param {Connection} [options.connection] Specify connection object to execute command against
- * @param {opResultCallback} callback A callback function
- */
- ReplSet.prototype.command = function(ns, cmd, options, callback) {
- if(typeof options == 'function') callback = options, options = {};
- if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
- var server = null;
- var self = this;
- // Ensure we have no options
- options = options || {};
- // Topology is not connected, save the call in the provided store to be
- // Executed at some point when the handler deems it's reconnected
- if(!this.isConnected(options) && this.s.disconnectHandler != null) {
- callback = bindToCurrentDomain(callback);
- return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
- }
- // We need to execute the command on all servers
- if(options.onAll) {
- var servers = this.s.replState.getAll();
- var count = servers.length;
- var cmdErr = null;
- for(var i = 0; i < servers.length; i++) {
- servers[i].command(ns, cmd, options, function(err, r) {
- count = count - 1;
- // Finished executing command
- if(count == 0) {
- // Was it a logout command clear any credentials
- if(cmd.logout) clearCredentials(self.s, ns);
- // We have a no master error, immediately refresh the view of the replicaset
- if(notMasterError(r) || notMasterError(err)) replicasetInquirer(self, self.s, true)();
- // Return the error
- callback(err, r);
- }
- });
- }
- return;
- }
- // Pick the right server based on readPreference
- try {
- server = pickServer(self, self.s, options.writeConcern ? ReadPreference.primary : options.readPreference);
- if(self.s.debug) self.emit('pickedServer', options.writeConcern ? ReadPreference.primary : options.readPreference, server);
- } catch(err) {
- return callback(err);
- }
- // No server returned we had an error
- if(server == null) return callback(new MongoError("no server found"));
- // Execute the command
- server.command(ns, cmd, options, function(err, r) {
- // Was it a logout command clear any credentials
- if(cmd.logout) clearCredentials(self.s, ns);
- // We have a no master error, immediately refresh the view of the replicaset
- if(notMasterError(r) || notMasterError(err)) {
- replicasetInquirer(self, self.s, true)();
- }
- // Return the error
- callback(err, r);
- });
- }
- /**
- * Perform one or more remove operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of removes
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {opResultCallback} callback A callback function
- */
- ReplSet.prototype.remove = function(ns, ops, options, callback) {
- if(typeof options == 'function') callback = options, options = {};
- if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
- // Topology is not connected, save the call in the provided store to be
- // Executed at some point when the handler deems it's reconnected
- if(!this.isConnected() && this.s.disconnectHandler != null) {
- callback = bindToCurrentDomain(callback);
- return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
- }
- executeWriteOperation(this, 'remove', ns, ops, options, callback);
- }
- /**
- * Insert one or more documents
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of documents to insert
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {opResultCallback} callback A callback function
- */
- ReplSet.prototype.insert = function(ns, ops, options, callback) {
- if(typeof options == 'function') callback = options, options = {};
- if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
- // Topology is not connected, save the call in the provided store to be
- // Executed at some point when the handler deems it's reconnected
- if(!this.isConnected() && this.s.disconnectHandler != null) {
- callback = bindToCurrentDomain(callback);
- return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
- }
- executeWriteOperation(this, 'insert', ns, ops, options, callback);
- }
- /**
- * Perform one or more update operations
- * @method
- * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
- * @param {array} ops An array of updates
- * @param {boolean} [options.ordered=true] Execute in order or out of order
- * @param {object} [options.writeConcern={}] Write concern for the operation
- * @param {opResultCallback} callback A callback function
- */
- ReplSet.prototype.update = function(ns, ops, options, callback) {
- if(typeof options == 'function') callback = options, options = {};
- if(this.s.replState.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
- // Topology is not connected, save the call in the provided store to be
- // Executed at some point when the handler deems it's reconnected
- if(!this.isConnected() && this.s.disconnectHandler != null) {
- callback = bindToCurrentDomain(callback);
- return this.s.disconnectHandler.add('update', ns, ops, options, callback);
- }
- executeWriteOperation(this, 'update', ns, ops, options, callback);
- }
- /**
- * Authenticate using a specified mechanism
- * @method
- * @param {string} mechanism The Auth mechanism we are invoking
- * @param {string} db The db we are invoking the mechanism against
- * @param {...object} param Parameters for the specific mechanism
- * @param {authResultCallback} callback A callback function
- */
- ReplSet.prototype.auth = function(mechanism, db) {
- var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
- var self = this;
- var args = Array.prototype.slice.call(arguments, 2);
- var callback = args.pop();
- // If we don't have the mechanism fail
- if(this.s.authProviders[mechanism] == null && mechanism != 'default')
- throw new MongoError(f("auth provider %s does not exist", mechanism));
- // Authenticate against all the servers
- var servers = this.s.replState.getAll().slice(0);
- var count = servers.length;
- // Correct authentication
- var authenticated = true;
- var authErr = null;
- // Set auth in progress
- this.s.authInProgress = true;
- // Authenticate against all servers
- while(servers.length > 0) {
- var server = servers.shift();
- // Arguments without a callback
- var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
- // Create arguments
- var finalArguments = argsWithoutCallback.concat([function(err, r) {
- count = count - 1;
- if(err) authErr = err;
- if(!r) authenticated = false;
- // We are done
- if(count == 0) {
- // We have more servers that are not authenticated, let's authenticate
- if(self.s.authInProgressServers.length > 0) {
- self.s.authInProgressServers = [];
- return self.auth.apply(self, [mechanism, db].concat(args).concat([callback]));
- }
- // Auth is done
- self.s.authInProgress = false;
- // Add successful credentials
- if(authErr == null) addCredentials(self.s, db, argsWithoutCallback);
- // Return the auth error
- if(authErr) return callback(authErr, false);
- // Successfully authenticated session
- callback(null, new Session({}, self));
- }
- }]);
- // Execute the auth
- server.auth.apply(server, finalArguments);
- }
- }
- ReplSet.prototype.state = function() {
- return this.s.replState.state;
- }
-
- /**
- * Ensure single socket connections to arbiters and hidden servers
- * @method
- */
- var handleIsmaster = function(self) {
- return function(ismaster, _server) {
- if(ismaster.arbiterOnly) {
- _server.s.options.size = 1;
- } else if(ismaster.hidden) {
- _server.s.options.size = 1;
- }
- }
- }
- /**
- * Initiate server connect
- * @method
- */
- ReplSet.prototype.connect = function(_options) {
- var self = this;
- // Start replicaset inquiry process
- setTimeout(replicasetInquirer(this, this.s, false), this.s.haInterval);
- // Additional options
- if(_options) for(var name in _options) this.s.options[name] = _options[name];
- // Set the state as connecting
- this.s.replState.state = CONNECTING;
- // No fullsetup reached
- this.s.fullsetup = false;
- // For all entries in the seedlist build a server instance
- this.s.seedlist.forEach(function(e) {
- // Clone options
- var opts = cloneOptions(self.s.options);
- // Add host and port
- opts.host = e.host;
- opts.port = e.port;
- opts.reconnect = false;
- opts.readPreferenceStrategies = self.s.readPreferenceStrategies;
- opts.emitError = true;
- if(self.s.tag) opts.tag = self.s.tag;
- // Share the auth store
- opts.authProviders = self.s.authProviders;
- // Create a new Server
- var server = new Server(opts);
- // Handle the ismaster
- server.on('ismaster', handleIsmaster(self));
- // Add to list of disconnected servers
- self.s.disconnectedServers.push(server);
- // Add to list of inflight Connections
- self.s.initialConnectionServers.push(server);
- });
- // Attempt to connect to all the servers
- while(this.s.disconnectedServers.length > 0) {
- // Get the server
- var server = this.s.disconnectedServers.shift();
- // Set up the event handlers
- server.once('error', errorHandlerTemp(this, this.s, 'error'));
- server.once('close', errorHandlerTemp(this, this.s, 'close'));
- server.once('timeout', errorHandlerTemp(this, this.s, 'timeout'));
- server.once('connect', connectHandler(this, this.s));
- // Attempt to connect
- server.connect();
- }
- }
- /**
- * Figure out if the server is connected
- * @method
- * @return {boolean}
- */
- ReplSet.prototype.isConnected = function(options) {
- options = options || {};
- // If we specified a read preference check if we are connected to something
- // than can satisfy this
- if(options.readPreference
- && options.readPreference.equals(ReadPreference.secondary))
- return this.s.replState.isSecondaryConnected();
- if(options.readPreference
- && options.readPreference.equals(ReadPreference.primary))
- return this.s.replState.isSecondaryConnected() || this.s.replState.isPrimaryConnected();
- if(this.s.secondaryOnlyConnectionAllowed) return this.s.replState.isSecondaryConnected();
- return this.s.replState.isPrimaryConnected();
- }
- /**
- * Figure out if the replicaset instance was destroyed by calling destroy
- * @method
- * @return {boolean}
- */
- ReplSet.prototype.isDestroyed = function() {
- return this.s.replState.state == DESTROYED;
- }
- /**
- * Destroy the server connection
- * @method
- */
- ReplSet.prototype.destroy = function(emitClose) {
- var self = this;
- if(this.s.logger.isInfo()) this.s.logger.info(f('[%s] destroyed', this.s.id));
- this.s.replState.state = DESTROYED;
- // Emit close
- if(emitClose && self.listeners('close').length > 0) self.emit('close', self);
- // Destroy state
- this.s.replState.destroy();
- // Clear out any listeners
- var events = ['timeout', 'error', 'close', 'joined', 'left'];
- events.forEach(function(e) {
- self.removeAllListeners(e);
- });
- }
- /**
- * A replset connect event, used to verify that the connection is up and running
- *
- * @event ReplSet#connect
- * @type {ReplSet}
- */
- /**
- * The replset high availability event
- *
- * @event ReplSet#ha
- * @type {function}
- * @param {string} type The stage in the high availability event (start|end)
- * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
- * @param {number} data.id The id for this high availability request
- * @param {object} data.state An object containing the information about the current replicaset
- */
- /**
- * A server member left the replicaset
- *
- * @event ReplSet#left
- * @type {function}
- * @param {string} type The type of member that left (primary|secondary|arbiter)
- * @param {Server} server The server object that left
- */
- /**
- * A server member joined the replicaset
- *
- * @event ReplSet#joined
- * @type {function}
- * @param {string} type The type of member that joined (primary|secondary|arbiter)
- * @param {Server} server The server object that joined
- */
- //
- // Inquires about state changes
- //
- // Add the new credential for a db, removing the old
- // credential from the cache
- var addCredentials = function(s, db, argsWithoutCallback) {
- // Remove any credentials for the db
- clearCredentials(s, db + ".dummy");
- // Add new credentials to list
- s.credentials.push(argsWithoutCallback);
- }
- // Clear out credentials for a namespace
- var clearCredentials = function(s, ns) {
- var db = ns.split('.')[0];
- var filteredCredentials = [];
- // Filter out all credentials for the db the user is logging out off
- for(var i = 0; i < s.credentials.length; i++) {
- if(s.credentials[i][1] != db) filteredCredentials.push(s.credentials[i]);
- }
- // Set new list of credentials
- s.credentials = filteredCredentials;
- }
- //
- // Filter serves by tags
- var filterByTags = function(readPreference, servers) {
- if(readPreference.tags == null) return servers;
- var filteredServers = [];
- var tags = readPreference.tags;
- // Iterate over all the servers
- for(var i = 0; i < servers.length; i++) {
- var serverTag = servers[i].lastIsMaster().tags || {};
- // Did we find the a matching server
- var found = true;
- // Check if the server is valid
- for(var name in tags) {
- if(serverTag[name] != tags[name]) found = false;
- }
- // Add to candidate list
- if(found) filteredServers.push(servers[i]);
- }
- // Returned filtered servers
- return filteredServers;
- }
- //
- // Pick a server based on readPreference
- var pickServer = function(self, s, readPreference) {
- // If no read Preference set to primary by default
- readPreference = readPreference || ReadPreference.primary;
- // Do we have a custom readPreference strategy, use it
- if(s.readPreferenceStrategies != null && s.readPreferenceStrategies[readPreference.preference] != null) {
- if(s.readPreferenceStrategies[readPreference.preference] == null) throw new MongoError(f("cannot locate read preference handler for %s", readPreference.preference));
- var server = s.readPreferenceStrategies[readPreference.preference].pickServer(s.replState, readPreference);
- if(s.debug) self.emit('pickedServer', readPreference, server);
- return server;
- }
- // Filter out any hidden secondaries
- var secondaries = s.replState.secondaries.filter(function(server) {
- if(server.lastIsMaster().hidden) return false;
- return true;
- });
- // Check if we can satisfy and of the basic read Preferences
- if(readPreference.equals(ReadPreference.secondary)
- && secondaries.length == 0)
- throw new MongoError("no secondary server available");
- if(readPreference.equals(ReadPreference.secondaryPreferred)
- && secondaries.length == 0
- && s.replState.primary == null)
- throw new MongoError("no secondary or primary server available");
- if(readPreference.equals(ReadPreference.primary)
- && s.replState.primary == null)
- throw new MongoError("no primary server available");
- // Secondary
- if(readPreference.equals(ReadPreference.secondary)) {
- s.index = (s.index + 1) % secondaries.length;
- return secondaries[s.index];
- }
- // Secondary preferred
- if(readPreference.equals(ReadPreference.secondaryPreferred)) {
- if(secondaries.length > 0) {
- // Apply tags if present
- var servers = filterByTags(readPreference, secondaries);
- // If have a matching server pick one otherwise fall through to primary
- if(servers.length > 0) {
- s.index = (s.index + 1) % servers.length;
- return servers[s.index];
- }
- }
- return s.replState.primary;
- }
- // Primary preferred
- if(readPreference.equals(ReadPreference.primaryPreferred)) {
- if(s.replState.primary) return s.replState.primary;
- if(secondaries.length > 0) {
- // Apply tags if present
- var servers = filterByTags(readPreference, secondaries);
- // If have a matching server pick one otherwise fall through to primary
- if(servers.length > 0) {
- s.index = (s.index + 1) % servers.length;
- return servers[s.index];
- }
- // Throw error a we have not valid secondary or primary servers
- throw new MongoError("no secondary or primary server available");
- }
- }
- // Return the primary
- return s.replState.primary;
- }
- var replicasetInquirer = function(self, state, norepeat) {
- return function() {
- if(state.replState.state == DESTROYED) return
- // Process already running don't rerun
- if(state.highAvailabilityProcessRunning) return;
- // Started processes
- state.highAvailabilityProcessRunning = true;
- if(state.logger.isInfo()) state.logger.info(f('[%s] monitoring process running %s', state.id, JSON.stringify(state.replState)));
- // Unique HA id to identify the current look running
- var localHaId = state.haId++;
- // Clean out any failed connection attempts
- state.connectingServers = {};
- // Controls if we are doing a single inquiry or repeating
- norepeat = typeof norepeat == 'boolean' ? norepeat : false;
- // If we have a primary and a disconnect handler, execute
- // buffered operations
- if(state.replState.isPrimaryConnected() && state.replState.isSecondaryConnected() && state.disconnectHandler) {
- state.disconnectHandler.execute();
- }
- // Emit replicasetInquirer
- self.emit('ha', 'start', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
- // Let's process all the disconnected servers
- while(state.disconnectedServers.length > 0) {
- // Get the first disconnected server
- var server = state.disconnectedServers.shift();
- if(state.logger.isInfo()) state.logger.info(f('[%s] monitoring attempting to connect to %s', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
- // Set up the event handlers
- server.once('error', errorHandlerTemp(self, state, 'error'));
- server.once('close', errorHandlerTemp(self, state, 'close'));
- server.once('timeout', errorHandlerTemp(self, state, 'timeout'));
- server.once('connect', connectHandler(self, state));
- // Attempt to connect
- server.connect();
- }
- // Cleanup state (removed disconnected servers)
- state.replState.clean();
- // We need to query all servers
- var servers = state.replState.getAll();
- var serversLeft = servers.length;
- // If no servers and we are not destroyed keep pinging
- if(servers.length == 0 && state.replState.state == CONNECTED) {
- // Emit ha process end
- self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
- // Ended highAvailabilityProcessRunning
- state.highAvailabilityProcessRunning = false;
- // Restart ha process
- if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
- return;
- }
- //
- // ismaster for Master server
- var primaryIsMaster = null;
- //
- // Inspect a specific servers ismaster
- var inspectServer = function(server) {
- if(state.replState.state == DESTROYED) return;
- // Did we get a server
- if(server && server.isConnected()) {
- // Execute ismaster
- server.command('system.$cmd', {ismaster:true}, function(err, r) {
- if(state.replState.state == DESTROYED) return;
- // Count down the number of servers left
- serversLeft = serversLeft - 1;
- // If we have an error but still outstanding server request return
- if(err && serversLeft > 0) return;
- // We had an error and have no more servers to inspect, schedule a new check
- if(err && serversLeft == 0) {
- self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
- // Ended highAvailabilityProcessRunnfing
- state.highAvailabilityProcessRunning = false;
- // Return the replicasetInquirer
- if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
- return;
- }
- // Let all the read Preferences do things to the servers
- var rPreferencesCount = Object.keys(state.readPreferenceStrategies).length;
- // Handle the primary
- var ismaster = r.result;
- if(state.logger.isDebug()) state.logger.debug(f('[%s] monitoring process ismaster %s', state.id, JSON.stringify(ismaster)));
- // Update the replicaset state
- state.replState.update(ismaster, server);
- // Add any new servers
- if(err == null && ismaster.ismaster && Array.isArray(ismaster.hosts)) {
- // Hosts to process
- var hosts = ismaster.hosts;
- // Add arbiters to list of hosts if we have any
- if(Array.isArray(ismaster.arbiters)) hosts = hosts.concat(ismaster.arbiters);
- if(Array.isArray(ismaster.passives)) hosts = hosts.concat(ismaster.passives);
- // Process all the hsots
- processHosts(self, state, hosts);
- }
- // No read Preferences strategies
- if(rPreferencesCount == 0) {
- // Don't schedule a new inquiry
- if(serversLeft > 0) return;
- // Emit ha process end
- self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
- // Ended highAvailabilityProcessRunning
- state.highAvailabilityProcessRunning = false;
- // Let's keep monitoring
- if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
- return;
- }
- // No servers left to query, execute read preference strategies
- if(serversLeft == 0) {
- // Go over all the read preferences
- for(var name in state.readPreferenceStrategies) {
- state.readPreferenceStrategies[name].ha(self, state.replState, function() {
- rPreferencesCount = rPreferencesCount - 1;
- if(rPreferencesCount == 0) {
- // Add any new servers in primary ismaster
- if(err == null
- && ismaster.ismaster
- && Array.isArray(ismaster.hosts)) {
- processHosts(self, state, ismaster.hosts);
- }
- // Emit ha process end
- self.emit('ha', 'end', {norepeat: norepeat, id: localHaId, state: state.replState ? state.replState.toJSON() : {}});
- // Ended highAvailabilityProcessRunning
- state.highAvailabilityProcessRunning = false;
- // Let's keep monitoring
- if(!norepeat) setTimeout(replicasetInquirer(self, state, false), state.haInterval);
- return;
- }
- });
- }
- }
- });
- }
- }
- // Call ismaster on all servers
- for(var i = 0; i < servers.length; i++) {
- inspectServer(servers[i]);
- }
- // If no more initial servers and new scheduled servers to connect
- if(state.replState.secondaries.length >= 1 && state.replState.primary != null && !state.fullsetup) {
- state.fullsetup = true;
- self.emit('fullsetup', self);
- }
- // If all servers are accounted for and we have not sent the all event
- if(state.replState.primary != null && self.lastIsMaster()
- && Array.isArray(self.lastIsMaster().hosts) && !state.all) {
- var length = 1 + state.replState.secondaries.length;
- // If we have all secondaries + primary
- if(length == self.lastIsMaster().hosts.length + 1) {
- state.all = true;
- self.emit('all', self);
- }
- }
- }
- }
- // Error handler for initial connect
- var errorHandlerTemp = function(self, state, event) {
- return function(err, server) {
- // Log the information
- if(state.logger.isInfo()) state.logger.info(f('[%s] server %s disconnected', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
- // Filter out any connection servers
- state.initialConnectionServers = state.initialConnectionServers.filter(function(_server) {
- return server.name != _server.name;
- });
- // Connection is destroyed, ignore
- if(state.replState.state == DESTROYED) return;
- // Remove any non used handlers
- ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
- server.removeAllListeners(e);
- })
- // Push to list of disconnected servers
- addToListIfNotExist(state.disconnectedServers, server);
- // End connection operation if we have no legal replicaset state
- if(state.initialConnectionServers == 0 && state.replState.state == CONNECTING) {
- if((state.secondaryOnlyConnectionAllowed && !state.replState.isSecondaryConnected() && !state.replState.isPrimaryConnected())
- || (!state.secondaryOnlyConnectionAllowed && !state.replState.isPrimaryConnected())) {
- if(state.logger.isInfo()) state.logger.info(f('[%s] no valid seed servers in list', state.id));
- if(self.listeners('error').length > 0)
- return self.emit('error', new MongoError('no valid seed servers in list'));
- }
- }
- // If the number of disconnected servers is equal to
- // the number of seed servers we cannot connect
- if(state.disconnectedServers.length == state.seedlist.length && state.replState.state == CONNECTING) {
- if(state.emitError && self.listeners('error').length > 0) {
- if(state.logger.isInfo()) state.logger.info(f('[%s] no valid seed servers in list', state.id));
- if(self.listeners('error').length > 0)
- self.emit('error', new MongoError('no valid seed servers in list'));
- }
- }
- }
- }
- // Connect handler
- var connectHandler = function(self, state) {
- return function(server) {
- if(state.logger.isInfo()) state.logger.info(f('[%s] connected to %s', state.id, server.name));
- // Destroyed connection
- if(state.replState.state == DESTROYED) {
- server.destroy(false, false);
- return;
- }
- // Filter out any connection servers
- state.initialConnectionServers = state.initialConnectionServers.filter(function(_server) {
- return server.name != _server.name;
- });
- // Process the new server
- var processNewServer = function() {
- // Discover any additional servers
- var ismaster = server.lastIsMaster();
- var events = ['error', 'close', 'timeout', 'connect', 'message'];
- // Remove any non used handlers
- events.forEach(function(e) {
- server.removeAllListeners(e);
- })
- // Clean up
- delete state.connectingServers[server.name];
- // Update the replicaset state, destroy if not added
- if(!state.replState.update(ismaster, server)) {
- return server.destroy();
- }
- // Add the server handling code
- if(server.isConnected()) {
- server.on('error', errorHandler(self, state));
- server.on('close', closeHandler(self, state));
- server.on('timeout', timeoutHandler(self, state));
- }
- // Hosts to process
- var hosts = ismaster.hosts;
- // Add arbiters to list of hosts if we have any
- if(Array.isArray(ismaster.arbiters)) hosts = hosts.concat(ismaster.arbiters);
- if(Array.isArray(ismaster.passives)) hosts = hosts.concat(ismaster.passives);
- // Add any new servers
- processHosts(self, state, hosts);
- // If have the server instance already destroy it
- if(state.initialConnectionServers.length == 0 && Object.keys(state.connectingServers).length == 0
- && !state.replState.isPrimaryConnected() && !state.secondaryOnlyConnectionAllowed && state.replState.state == CONNECTING) {
- if(state.logger.isInfo()) state.logger.info(f('[%s] no primary found in replicaset', state.id));
- self.emit('error', new MongoError("no primary found in replicaset"));
- return self.destroy();
- }
- // If no more initial servers and new scheduled servers to connect
- if(state.replState.secondaries.length >= 1 && state.replState.primary != null && !state.fullsetup) {
- state.fullsetup = true;
- self.emit('fullsetup', self);
- }
- }
- // Save up new members to be authenticated against
- if(self.s.authInProgress) {
- self.s.authInProgressServers.push(server);
- }
- // No credentials just process server
- if(state.credentials.length == 0) return processNewServer();
- // Do we have credentials, let's apply them all
- var count = state.credentials.length;
- // Apply the credentials
- for(var i = 0; i < state.credentials.length; i++) {
- server.auth.apply(server, state.credentials[i].concat([function(err, r) {
- count = count - 1;
- if(count == 0) processNewServer();
- }]));
- }
- }
- }
- //
- // Detect if we need to add new servers
- var processHosts = function(self, state, hosts) {
- if(state.replState.state == DESTROYED) return;
- if(Array.isArray(hosts)) {
- // Check any hosts exposed by ismaster
- for(var i = 0; i < hosts.length; i++) {
- // If not found we need to create a new connection
- if(!state.replState.contains(hosts[i])) {
- if(state.connectingServers[hosts[i]] == null && !inInitialConnectingServers(self, state, hosts[i])) {
- if(state.logger.isInfo()) state.logger.info(f('[%s] scheduled server %s for connection', state.id, hosts[i]));
- // Make sure we know what is trying to connect
- state.connectingServers[hosts[i]] = hosts[i];
- // Connect the server
- connectToServer(self, state, hosts[i].split(':')[0], parseInt(hosts[i].split(':')[1], 10));
- }
- }
- }
- }
- }
- var inInitialConnectingServers = function(self, state, address) {
- for(var i = 0; i < state.initialConnectionServers.length; i++) {
- if(state.initialConnectionServers[i].name == address) return true;
- }
- return false;
- }
- // Connect to a new server
- var connectToServer = function(self, state, host, port) {
- var opts = cloneOptions(state.options);
- opts.host = host;
- opts.port = port;
- opts.reconnect = false;
- opts.readPreferenceStrategies = state.readPreferenceStrategies;
- if(state.tag) opts.tag = state.tag;
- // Share the auth store
- opts.authProviders = state.authProviders;
- opts.emitError = true;
- // Create a new server instance
- var server = new Server(opts);
- // Handle the ismaster
- server.on('ismaster', handleIsmaster(self));
- // Set up the event handlers
- server.once('error', errorHandlerTemp(self, state, 'error'));
- server.once('close', errorHandlerTemp(self, state, 'close'));
- server.once('timeout', errorHandlerTemp(self, state, 'timeout'));
- server.once('connect', connectHandler(self, state));
- // Attempt to connect
- server.connect();
- }
- //
- // Add server to the list if it does not exist
- var addToListIfNotExist = function(list, server) {
- var found = false;
- // Remove any non used handlers
- ['error', 'close', 'timeout', 'connect'].forEach(function(e) {
- server.removeAllListeners(e);
- })
- // Check if the server already exists
- for(var i = 0; i < list.length; i++) {
- if(list[i].equals(server)) found = true;
- }
- if(!found) {
- list.push(server);
- }
- return found;
- }
- var errorHandler = function(self, state) {
- return function(err, server) {
- if(state.replState.state == DESTROYED) return;
- if(state.logger.isInfo()) state.logger.info(f('[%s] server %s errored out with %s', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name, JSON.stringify(err)));
- var found = addToListIfNotExist(state.disconnectedServers, server);
- if(!found) self.emit('left', state.replState.remove(server), server);
- if(found && state.emitError && self.listeners('error').length > 0) self.emit('error', err, server);
- // Fire off a detection of missing server using minHeartbeatFrequencyMS
- setTimeout(function() {
- replicasetInquirer(self, self.s, true)();
- }, self.s.minHeartbeatFrequencyMS);
- }
- }
- var timeoutHandler = function(self, state) {
- return function(err, server) {
- if(state.replState.state == DESTROYED) return;
- if(state.logger.isInfo()) state.logger.info(f('[%s] server %s timed out', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
- var found = addToListIfNotExist(state.disconnectedServers, server);
- if(!found) self.emit('left', state.replState.remove(server), server);
- // Fire off a detection of missing server using minHeartbeatFrequencyMS
- setTimeout(function() {
- replicasetInquirer(self, self.s, true)();
- }, self.s.minHeartbeatFrequencyMS);
- }
- }
- var closeHandler = function(self, state) {
- return function(err, server) {
- if(state.replState.state == DESTROYED) return;
- if(state.logger.isInfo()) state.logger.info(f('[%s] server %s closed', state.id, server.lastIsMaster() ? server.lastIsMaster().me : server.name));
- var found = addToListIfNotExist(state.disconnectedServers, server);
- if(!found) self.emit('left', state.replState.remove(server), server);
- // Fire off a detection of missing server using minHeartbeatFrequencyMS
- setTimeout(function() {
- replicasetInquirer(self, self.s, true)();
- }, self.s.minHeartbeatFrequencyMS);
- }
- }
- //
- // Validate if a non-master or recovering error
- var notMasterError = function(r) {
- // Get result of any
- var result = r && r.result ? r.result : r;
- // Explore if we have a not master error
- if(result && (result.err == 'not master'
- || result.errmsg == 'not master' || (result['$err'] && result['$err'].indexOf('not master or secondary') != -1)
- || (result['$err'] && result['$err'].indexOf("not master and slaveOk=false") != -1)
- || result.errmsg == 'node is recovering')) {
- return true;
- }
- return false;
- }
- module.exports = ReplSet;
|