mongos.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. "use strict";
  2. var EventEmitter = require('events').EventEmitter
  3. , inherits = require('util').inherits
  4. , f = require('util').format
  5. , ServerCapabilities = require('./topology_base').ServerCapabilities
  6. , MongoCR = require('mongodb-core').MongoCR
  7. , CMongos = require('mongodb-core').Mongos
  8. , Cursor = require('./cursor')
  9. , Server = require('./server')
  10. , Store = require('./topology_base').Store
  11. , shallowClone = require('./utils').shallowClone;
  12. /**
  13. * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
  14. * used to construct connections.
  15. *
  16. * **Mongos Should not be used, use MongoClient.connect**
  17. * @example
  18. * var Db = require('mongodb').Db,
  19. * Mongos = require('mongodb').Mongos,
  20. * Server = require('mongodb').Server,
  21. * test = require('assert');
  22. * // Connect using Mongos
  23. * var server = new Server('localhost', 27017);
  24. * var db = new Db('test', new Mongos([server]));
  25. * db.open(function(err, db) {
  26. * // Get an additional db
  27. * db.close();
  28. * });
  29. */
  30. /**
  31. * Creates a new Mongos instance
  32. * @class
  33. * @deprecated
  34. * @param {Server[]} servers A seedlist of servers participating in the replicaset.
  35. * @param {object} [options=null] Optional settings.
  36. * @param {booelan} [options.ha=true] Turn on high availability monitoring.
  37. * @param {number} [options.haInterval=5000] Time between each replicaset status check.
  38. * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
  39. * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
  40. * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
  41. * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
  42. * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  43. * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
  44. * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
  45. * @param {object} [options.socketOptions=null] Socket options
  46. * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
  47. * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
  48. * @param {number} [options.socketOptions.connectTimeoutMS=0] TCP Connection timeout setting
  49. * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
  50. * @fires Mongos#connect
  51. * @fires Mongos#ha
  52. * @fires Mongos#joined
  53. * @fires Mongos#left
  54. * @fires Mongos#fullsetup
  55. * @fires Mongos#open
  56. * @fires Mongos#close
  57. * @fires Mongos#error
  58. * @fires Mongos#timeout
  59. * @fires Mongos#parseError
  60. * @return {Mongos} a Mongos instance.
  61. */
  62. var Mongos = function(servers, options) {
  63. if(!(this instanceof Mongos)) return new Mongos(servers, options);
  64. options = options || {};
  65. var self = this;
  66. // Ensure all the instances are Server
  67. for(var i = 0; i < servers.length; i++) {
  68. if(!(servers[i] instanceof Server)) {
  69. throw new MongoError("all seed list instances must be of the Server type");
  70. }
  71. }
  72. // Store option defaults
  73. var storeOptions = {
  74. force: false
  75. , bufferMaxEntries: -1
  76. }
  77. // Shared global store
  78. var store = options.store || new Store(self, storeOptions);
  79. // Set up event emitter
  80. EventEmitter.call(this);
  81. // Debug tag
  82. var tag = options.tag;
  83. // Build seed list
  84. var seedlist = servers.map(function(x) {
  85. return {host: x.host, port: x.port}
  86. });
  87. // Final options
  88. var finalOptions = shallowClone(options);
  89. // Default values
  90. finalOptions.size = typeof options.poolSize == 'number' ? options.poolSize : 5;
  91. finalOptions.reconnect = typeof options.auto_reconnect == 'boolean' ? options.auto_reconnect : true;
  92. finalOptions.emitError = typeof options.emitError == 'boolean' ? options.emitError : true;
  93. finalOptions.cursorFactory = Cursor;
  94. // Add the store
  95. finalOptions.disconnectHandler = store;
  96. // Socket options passed down
  97. if(options.socketOptions) {
  98. if(options.socketOptions.connectTimeoutMS) {
  99. this.connectTimeoutMS = options.socketOptions.connectTimeoutMS;
  100. finalOptions.connectionTimeout = options.socketOptions.connectTimeoutMS;
  101. }
  102. if(options.socketOptions.socketTimeoutMS)
  103. finalOptions.socketTimeout = options.socketOptions.socketTimeoutMS;
  104. }
  105. // Are we running in debug mode
  106. var debug = typeof options.debug == 'boolean' ? options.debug : false;
  107. if(debug) {
  108. finalOptions.debug = debug;
  109. }
  110. // Map keep alive setting
  111. if(options.socketOptions && typeof options.socketOptions.keepAlive == 'number') {
  112. finalOptions.keepAlive = true;
  113. if(typeof options.socketOptions.keepAlive == 'number') {
  114. finalOptions.keepAliveInitialDelay = options.socketOptions.keepAlive;
  115. }
  116. }
  117. // Connection timeout
  118. if(options.socketOptions && typeof options.socketOptions.connectionTimeout == 'number') {
  119. finalOptions.connectionTimeout = options.socketOptions.connectionTimeout;
  120. }
  121. // Socket timeout
  122. if(options.socketOptions && typeof options.socketOptions.socketTimeout == 'number') {
  123. finalOptions.socketTimeout = options.socketOptions.socketTimeout;
  124. }
  125. // noDelay
  126. if(options.socketOptions && typeof options.socketOptions.noDelay == 'boolean') {
  127. finalOptions.noDelay = options.socketOptions.noDelay;
  128. }
  129. if(typeof options.secondaryAcceptableLatencyMS == 'number') {
  130. finalOptions.acceptableLatency = options.secondaryAcceptableLatencyMS;
  131. }
  132. // Add the non connection store
  133. finalOptions.disconnectHandler = store;
  134. // Create the Mongos
  135. var mongos = new CMongos(seedlist, finalOptions)
  136. // Server capabilities
  137. var sCapabilities = null;
  138. // Add auth prbufferMaxEntriesoviders
  139. mongos.addAuthProvider('mongocr', new MongoCR());
  140. // Internal state
  141. this.s = {
  142. // Create the Mongos
  143. mongos: mongos
  144. // Server capabilities
  145. , sCapabilities: sCapabilities
  146. // Debug turned on
  147. , debug: debug
  148. // Store option defaults
  149. , storeOptions: storeOptions
  150. // Cloned options
  151. , clonedOptions: finalOptions
  152. // Actual store of callbacks
  153. , store: store
  154. // Options
  155. , options: options
  156. }
  157. // Last ismaster
  158. Object.defineProperty(this, 'isMasterDoc', {
  159. enumerable:true, get: function() { return self.s.mongos.lastIsMaster(); }
  160. });
  161. // Last ismaster
  162. Object.defineProperty(this, 'numberOfConnectedServers', {
  163. enumerable:true, get: function() { return self.s.mongos.connectedServers().length; }
  164. });
  165. // BSON property
  166. Object.defineProperty(this, 'bson', {
  167. enumerable: true, get: function() {
  168. return self.s.mongos.bson;
  169. }
  170. });
  171. Object.defineProperty(this, 'haInterval', {
  172. enumerable:true, get: function() { return self.s.mongos.haInterval; }
  173. });
  174. }
  175. /**
  176. * @ignore
  177. */
  178. inherits(Mongos, EventEmitter);
  179. // Connect
  180. Mongos.prototype.connect = function(db, _options, callback) {
  181. var self = this;
  182. if('function' === typeof _options) callback = _options, _options = {};
  183. if(_options == null) _options = {};
  184. if(!('function' === typeof callback)) callback = null;
  185. self.s.options = _options;
  186. // Update bufferMaxEntries
  187. self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
  188. // Error handler
  189. var connectErrorHandler = function(event) {
  190. return function(err) {
  191. // Remove all event handlers
  192. var events = ['timeout', 'error', 'close'];
  193. events.forEach(function(e) {
  194. self.removeListener(e, connectErrorHandler);
  195. });
  196. self.s.mongos.removeListener('connect', connectErrorHandler);
  197. // Try to callback
  198. try {
  199. callback(err);
  200. } catch(err) {
  201. process.nextTick(function() { throw err; })
  202. }
  203. }
  204. }
  205. // Actual handler
  206. var errorHandler = function(event) {
  207. return function(err) {
  208. if(event != 'error') {
  209. self.emit(event, err);
  210. }
  211. }
  212. }
  213. // Error handler
  214. var reconnectHandler = function(err) {
  215. self.emit('reconnect');
  216. self.s.store.execute();
  217. }
  218. // Connect handler
  219. var connectHandler = function() {
  220. // Clear out all the current handlers left over
  221. ["timeout", "error", "close"].forEach(function(e) {
  222. self.s.mongos.removeAllListeners(e);
  223. });
  224. // Set up listeners
  225. self.s.mongos.once('timeout', errorHandler('timeout'));
  226. self.s.mongos.once('error', errorHandler('error'));
  227. self.s.mongos.once('close', errorHandler('close'));
  228. // relay the event
  229. var relay = function(event) {
  230. return function(t, server) {
  231. self.emit(event, t, server);
  232. }
  233. }
  234. // Set up serverConfig listeners
  235. self.s.mongos.on('joined', relay('joined'));
  236. self.s.mongos.on('left', relay('left'));
  237. self.s.mongos.on('fullsetup', relay('fullsetup'));
  238. // Emit open event
  239. self.emit('open', null, self);
  240. // Return correctly
  241. try {
  242. callback(null, self);
  243. } catch(err) {
  244. process.nextTick(function() { throw err; })
  245. }
  246. }
  247. // Set up listeners
  248. self.s.mongos.once('timeout', connectErrorHandler('timeout'));
  249. self.s.mongos.once('error', connectErrorHandler('error'));
  250. self.s.mongos.once('close', connectErrorHandler('close'));
  251. self.s.mongos.once('connect', connectHandler);
  252. // Reconnect server
  253. self.s.mongos.on('reconnect', reconnectHandler);
  254. // Start connection
  255. self.s.mongos.connect(_options);
  256. }
  257. Mongos.prototype.parserType = function() {
  258. return this.s.mongos.parserType();
  259. }
  260. // Server capabilities
  261. Mongos.prototype.capabilities = function() {
  262. if(this.s.sCapabilities) return this.s.sCapabilities;
  263. if(this.s.mongos.lastIsMaster() == null) throw new MongoError('cannot establish topology capabilities as driver is still in process of connecting');
  264. this.s.sCapabilities = new ServerCapabilities(this.s.mongos.lastIsMaster());
  265. return this.s.sCapabilities;
  266. }
  267. // Command
  268. Mongos.prototype.command = function(ns, cmd, options, callback) {
  269. this.s.mongos.command(ns, cmd, options, callback);
  270. }
  271. // Insert
  272. Mongos.prototype.insert = function(ns, ops, options, callback) {
  273. this.s.mongos.insert(ns, ops, options, function(e, m) {
  274. callback(e, m)
  275. });
  276. }
  277. // Update
  278. Mongos.prototype.update = function(ns, ops, options, callback) {
  279. this.s.mongos.update(ns, ops, options, callback);
  280. }
  281. // Remove
  282. Mongos.prototype.remove = function(ns, ops, options, callback) {
  283. this.s.mongos.remove(ns, ops, options, callback);
  284. }
  285. // IsConnected
  286. Mongos.prototype.isConnected = function() {
  287. return this.s.mongos.isConnected();
  288. }
  289. // Insert
  290. Mongos.prototype.cursor = function(ns, cmd, options) {
  291. options.disconnectHandler = this.s.store;
  292. return this.s.mongos.cursor(ns, cmd, options);
  293. }
  294. Mongos.prototype.setBSONParserType = function(type) {
  295. return this.s.mongos.setBSONParserType(type);
  296. }
  297. Mongos.prototype.lastIsMaster = function() {
  298. return this.s.mongos.lastIsMaster();
  299. }
  300. Mongos.prototype.close = function(forceClosed) {
  301. this.s.mongos.destroy();
  302. // We need to wash out all stored processes
  303. if(forceClosed == true) {
  304. this.s.storeOptions.force = forceClosed;
  305. this.s.store.flush();
  306. }
  307. }
  308. Mongos.prototype.auth = function() {
  309. var args = Array.prototype.slice.call(arguments, 0);
  310. this.s.mongos.auth.apply(this.s.mongos, args);
  311. }
  312. /**
  313. * All raw connections
  314. * @method
  315. * @return {array}
  316. */
  317. Mongos.prototype.connections = function() {
  318. return this.s.mongos.connections();
  319. }
  320. /**
  321. * A mongos connect event, used to verify that the connection is up and running
  322. *
  323. * @event Mongos#connect
  324. * @type {Mongos}
  325. */
  326. /**
  327. * The mongos high availability event
  328. *
  329. * @event Mongos#ha
  330. * @type {function}
  331. * @param {string} type The stage in the high availability event (start|end)
  332. * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
  333. * @param {number} data.id The id for this high availability request
  334. * @param {object} data.state An object containing the information about the current replicaset
  335. */
  336. /**
  337. * A server member left the mongos set
  338. *
  339. * @event Mongos#left
  340. * @type {function}
  341. * @param {string} type The type of member that left (primary|secondary|arbiter)
  342. * @param {Server} server The server object that left
  343. */
  344. /**
  345. * A server member joined the mongos set
  346. *
  347. * @event Mongos#joined
  348. * @type {function}
  349. * @param {string} type The type of member that joined (primary|secondary|arbiter)
  350. * @param {Server} server The server object that joined
  351. */
  352. /**
  353. * Mongos fullsetup event, emitted when all proxies in the topology have been connected to.
  354. *
  355. * @event Mongos#fullsetup
  356. * @type {Mongos}
  357. */
  358. /**
  359. * Mongos open event, emitted when mongos can start processing commands.
  360. *
  361. * @event Mongos#open
  362. * @type {Mongos}
  363. */
  364. /**
  365. * Mongos close event
  366. *
  367. * @event Mongos#close
  368. * @type {object}
  369. */
  370. /**
  371. * Mongos error event, emitted if there is an error listener.
  372. *
  373. * @event Mongos#error
  374. * @type {MongoError}
  375. */
  376. /**
  377. * Mongos timeout event
  378. *
  379. * @event Mongos#timeout
  380. * @type {object}
  381. */
  382. /**
  383. * Mongos parseError event
  384. *
  385. * @event Mongos#parseError
  386. * @type {object}
  387. */
  388. module.exports = Mongos;