connection.js 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /*!
  2. * Module dependencies.
  3. */
  4. var MongooseConnection = require('../../connection')
  5. , mongo = require('mongodb')
  6. , Db = mongo.Db
  7. , Server = mongo.Server
  8. , Mongos = mongo.Mongos
  9. , STATES = require('../../connectionstate')
  10. , ReplSetServers = mongo.ReplSet
  11. , utils = require('../../utils');
  12. /**
  13. * A [node-mongodb-native](https://github.com/mongodb/node-mongodb-native) connection implementation.
  14. *
  15. * @inherits Connection
  16. * @api private
  17. */
  18. function NativeConnection() {
  19. MongooseConnection.apply(this, arguments);
  20. this._listening = false;
  21. };
  22. /**
  23. * Expose the possible connection states.
  24. * @api public
  25. */
  26. NativeConnection.STATES = STATES;
  27. /*!
  28. * Inherits from Connection.
  29. */
  30. NativeConnection.prototype.__proto__ = MongooseConnection.prototype;
  31. /**
  32. * Opens the connection to MongoDB.
  33. *
  34. * @param {Function} fn
  35. * @return {Connection} this
  36. * @api private
  37. */
  38. NativeConnection.prototype.doOpen = function (fn) {
  39. if (this.db) {
  40. mute(this);
  41. }
  42. var server = new Server(this.host, this.port, this.options.server);
  43. this.db = new Db(this.name, server, this.options.db);
  44. var self = this;
  45. this.db.open(function (err) {
  46. if (err) return fn(err);
  47. listen(self);
  48. fn();
  49. });
  50. return this;
  51. };
  52. /**
  53. * Switches to a different database using the same connection pool.
  54. *
  55. * Returns a new connection object, with the new db.
  56. *
  57. * @param {String} name The database name
  58. * @return {Connection} New Connection Object
  59. * @api public
  60. */
  61. NativeConnection.prototype.useDb = function (name) {
  62. // we have to manually copy all of the attributes...
  63. var newConn = new this.constructor();
  64. newConn.name = name;
  65. newConn.base = this.base;
  66. newConn.collections = {};
  67. newConn.models = {};
  68. newConn.replica = this.replica;
  69. newConn.hosts = this.hosts;
  70. newConn.host = this.host;
  71. newConn.port = this.port;
  72. newConn.user = this.user;
  73. newConn.pass = this.pass;
  74. newConn.options = this.options;
  75. newConn._readyState = this._readyState;
  76. newConn._closeCalled = this._closeCalled;
  77. newConn._hasOpened = this._hasOpened;
  78. newConn._listening = false;
  79. // First, when we create another db object, we are not guaranteed to have a
  80. // db object to work with. So, in the case where we have a db object and it
  81. // is connected, we can just proceed with setting everything up. However, if
  82. // we do not have a db or the state is not connected, then we need to wait on
  83. // the 'open' event of the connection before doing the rest of the setup
  84. // the 'connected' event is the first time we'll have access to the db object
  85. var self = this;
  86. if (this.db && this._readyState === STATES.connected) {
  87. wireup();
  88. } else {
  89. this.once('connected', wireup);
  90. }
  91. function wireup () {
  92. newConn.db = self.db.db(name);
  93. newConn.onOpen();
  94. // setup the events appropriately
  95. listen(newConn);
  96. }
  97. newConn.name = name;
  98. // push onto the otherDbs stack, this is used when state changes
  99. this.otherDbs.push(newConn);
  100. newConn.otherDbs.push(this);
  101. return newConn;
  102. };
  103. /*!
  104. * Register listeners for important events and bubble appropriately.
  105. */
  106. function listen (conn) {
  107. if (conn._listening) return;
  108. conn._listening = true;
  109. conn.db.on('close', function(){
  110. if (conn._closeCalled) return;
  111. // the driver never emits an `open` event. auto_reconnect still
  112. // emits a `close` event but since we never get another
  113. // `open` we can't emit close
  114. if (conn.db.serverConfig.autoReconnect) {
  115. conn.readyState = STATES.disconnected;
  116. conn.emit('close');
  117. return;
  118. }
  119. conn.onClose();
  120. });
  121. conn.db.on('error', function(err){
  122. conn.emit('error', err);
  123. });
  124. conn.db.on('reconnect', function() {
  125. conn.readyState = STATES.connected;
  126. conn.emit('reconnected');
  127. });
  128. conn.db.on('timeout', function(err){
  129. var error = new Error(err && err.err || 'connection timeout');
  130. conn.emit('error', error);
  131. });
  132. conn.db.on('open', function (err, db) {
  133. if (STATES.disconnected === conn.readyState && db && db.databaseName) {
  134. conn.readyState = STATES.connected;
  135. conn.emit('reconnected');
  136. }
  137. });
  138. conn.db.on('parseError', function(err) {
  139. conn.emit('parseError', err);
  140. });
  141. }
  142. /*!
  143. * Remove listeners registered in `listen`
  144. */
  145. function mute (conn) {
  146. if (!conn.db) throw new Error('missing db');
  147. conn.db.removeAllListeners("close");
  148. conn.db.removeAllListeners("error");
  149. conn.db.removeAllListeners("timeout");
  150. conn.db.removeAllListeners("open");
  151. conn.db.removeAllListeners("fullsetup");
  152. conn._listening = false;
  153. }
  154. /**
  155. * Opens a connection to a MongoDB ReplicaSet.
  156. *
  157. * See description of [doOpen](#NativeConnection-doOpen) for server options. In this case `options.replset` is also passed to ReplSetServers.
  158. *
  159. * @param {Function} fn
  160. * @api private
  161. * @return {Connection} this
  162. */
  163. NativeConnection.prototype.doOpenSet = function (fn) {
  164. if (this.db) {
  165. mute(this);
  166. }
  167. var servers = []
  168. , self = this;
  169. this.hosts.forEach(function (server) {
  170. var host = server.host || server.ipc;
  171. var port = server.port || 27017;
  172. servers.push(new Server(host, port, self.options.server));
  173. })
  174. var server = this.options.mongos
  175. ? new Mongos(servers, this.options.mongos)
  176. : new ReplSetServers(servers, this.options.replset);
  177. this.db = new Db(this.name, server, this.options.db);
  178. this.db.on('fullsetup', function () {
  179. self.emit('fullsetup')
  180. });
  181. this.db.open(function (err) {
  182. if (err) return fn(err);
  183. fn();
  184. listen(self);
  185. });
  186. return this;
  187. };
  188. /**
  189. * Closes the connection
  190. *
  191. * @param {Function} fn
  192. * @return {Connection} this
  193. * @api private
  194. */
  195. NativeConnection.prototype.doClose = function (fn) {
  196. this.db.close();
  197. if (fn) fn();
  198. return this;
  199. }
  200. /**
  201. * Prepares default connection options for the node-mongodb-native driver.
  202. *
  203. * _NOTE: `passed` options take precedence over connection string options._
  204. *
  205. * @param {Object} passed options that were passed directly during connection
  206. * @param {Object} [connStrOptions] options that were passed in the connection string
  207. * @api private
  208. */
  209. NativeConnection.prototype.parseOptions = function (passed, connStrOpts) {
  210. var o = passed || {};
  211. o.db || (o.db = {});
  212. o.auth || (o.auth = {});
  213. o.server || (o.server = {});
  214. o.replset || (o.replset = {});
  215. o.server.socketOptions || (o.server.socketOptions = {});
  216. o.replset.socketOptions || (o.replset.socketOptions = {});
  217. var opts = connStrOpts || {};
  218. Object.keys(opts).forEach(function (name) {
  219. switch (name) {
  220. case 'ssl':
  221. case 'poolSize':
  222. if ('undefined' == typeof o.server[name]) {
  223. o.server[name] = o.replset[name] = opts[name];
  224. }
  225. break;
  226. case 'slaveOk':
  227. if ('undefined' == typeof o.server.slave_ok) {
  228. o.server.slave_ok = opts[name];
  229. }
  230. break;
  231. case 'autoReconnect':
  232. if ('undefined' == typeof o.server.auto_reconnect) {
  233. o.server.auto_reconnect = opts[name];
  234. }
  235. break;
  236. case 'socketTimeoutMS':
  237. case 'connectTimeoutMS':
  238. if ('undefined' == typeof o.server.socketOptions[name]) {
  239. o.server.socketOptions[name] = o.replset.socketOptions[name] = opts[name];
  240. }
  241. break;
  242. case 'authdb':
  243. if ('undefined' == typeof o.auth.authdb) {
  244. o.auth.authdb = opts[name];
  245. }
  246. break;
  247. case 'authSource':
  248. if ('undefined' == typeof o.auth.authSource) {
  249. o.auth.authSource = opts[name];
  250. }
  251. break;
  252. case 'retries':
  253. case 'reconnectWait':
  254. case 'rs_name':
  255. if ('undefined' == typeof o.replset[name]) {
  256. o.replset[name] = opts[name];
  257. }
  258. break;
  259. case 'replicaSet':
  260. if ('undefined' == typeof o.replset.rs_name) {
  261. o.replset.rs_name = opts[name];
  262. }
  263. break;
  264. case 'readSecondary':
  265. if ('undefined' == typeof o.replset.read_secondary) {
  266. o.replset.read_secondary = opts[name];
  267. }
  268. break;
  269. case 'nativeParser':
  270. if ('undefined' == typeof o.db.native_parser) {
  271. o.db.native_parser = opts[name];
  272. }
  273. break;
  274. case 'w':
  275. case 'safe':
  276. case 'fsync':
  277. case 'journal':
  278. case 'wtimeoutMS':
  279. if ('undefined' == typeof o.db[name]) {
  280. o.db[name] = opts[name];
  281. }
  282. break;
  283. case 'readPreference':
  284. if ('undefined' == typeof o.db.read_preference) {
  285. o.db.read_preference = opts[name];
  286. }
  287. break;
  288. case 'readPreferenceTags':
  289. if ('undefined' == typeof o.db.read_preference_tags) {
  290. o.db.read_preference_tags = opts[name];
  291. }
  292. break;
  293. }
  294. })
  295. if (!('auto_reconnect' in o.server)) {
  296. o.server.auto_reconnect = true;
  297. }
  298. if (!o.db.read_preference) {
  299. // read from primaries by default
  300. o.db.read_preference = 'primary';
  301. }
  302. // mongoose creates its own ObjectIds
  303. o.db.forceServerObjectId = false;
  304. // default safe using new nomenclature
  305. if (!('journal' in o.db || 'j' in o.db ||
  306. 'fsync' in o.db || 'safe' in o.db || 'w' in o.db)) {
  307. o.db.w = 1;
  308. }
  309. validate(o);
  310. return o;
  311. }
  312. /*!
  313. * Validates the driver db options.
  314. *
  315. * @param {Object} o
  316. */
  317. function validate (o) {
  318. if (-1 === o.db.w || 0 === o.db.w) {
  319. if (o.db.journal || o.db.fsync || o.db.safe) {
  320. throw new Error(
  321. 'Invalid writeConcern: '
  322. + 'w set to -1 or 0 cannot be combined with safe|fsync|journal');
  323. }
  324. }
  325. }
  326. /*!
  327. * Module exports.
  328. */
  329. module.exports = NativeConnection;