pool.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. var mysql = require('../index.js');
  2. var EventEmitter = require('events').EventEmitter;
  3. var Util = require('util');
  4. var PoolConnection = require('./pool_connection.js');
  5. module.exports = Pool;
  6. Util.inherits(Pool, EventEmitter);
  7. function Pool(options) {
  8. EventEmitter.call(this);
  9. this.config = options.config;
  10. this.config.connectionConfig.pool = this;
  11. this._allConnections = [];
  12. this._freeConnections = [];
  13. this._connectionQueue = [];
  14. this._closed = false;
  15. }
  16. Pool.prototype.getConnection = function (cb) {
  17. if (this._closed) {
  18. return process.nextTick(function(){
  19. return cb(new Error('Pool is closed.'));
  20. });
  21. }
  22. var connection;
  23. if (this._freeConnections.length > 0) {
  24. connection = this._freeConnections.shift();
  25. return process.nextTick(function(){
  26. return cb(null, connection);
  27. });
  28. }
  29. if (this.config.connectionLimit === 0 || this._allConnections.length < this.config.connectionLimit) {
  30. connection = new PoolConnection(this, { config: this.config.connectionConfig });
  31. this._allConnections.push(connection);
  32. return connection.connect(function(err) {
  33. if (this._closed) {
  34. return cb(new Error('Pool is closed.'));
  35. }
  36. if (err) {
  37. return cb(err);
  38. }
  39. this.emit('connection', connection);
  40. return cb(null, connection);
  41. }.bind(this));
  42. }
  43. if (!this.config.waitForConnections) {
  44. return process.nextTick(function(){
  45. return cb(new Error('No connections available.'));
  46. });
  47. }
  48. if (this.config.queueLimit && this._connectionQueue.length >= this.config.queueLimit) {
  49. return cb(new Error('Queue limit reached.'));
  50. }
  51. this._connectionQueue.push(cb);
  52. };
  53. Pool.prototype.releaseConnection = function (connection) {
  54. var cb;
  55. if (!connection._pool) {
  56. // The connection has been removed from the pool and is no longer good.
  57. if (this._connectionQueue.length) {
  58. cb = this._connectionQueue.shift();
  59. process.nextTick(this.getConnection.bind(this, cb));
  60. }
  61. } else if (this._connectionQueue.length) {
  62. cb = this._connectionQueue.shift();
  63. process.nextTick(cb.bind(null, null, connection));
  64. } else {
  65. this._freeConnections.push(connection);
  66. }
  67. };
  68. Pool.prototype.end = function (cb) {
  69. this._closed = true;
  70. if (typeof cb != "function") {
  71. cb = function (err) {
  72. if (err) throw err;
  73. };
  74. }
  75. var calledBack = false;
  76. var closedConnections = 0;
  77. var connection;
  78. var endCB = function(err) {
  79. if (calledBack) {
  80. return;
  81. }
  82. if (err || ++closedConnections >= this._allConnections.length) {
  83. calledBack = true;
  84. return cb(err);
  85. }
  86. }.bind(this);
  87. if (this._allConnections.length === 0) {
  88. return endCB();
  89. }
  90. for (var i = 0; i < this._allConnections.length; i++) {
  91. connection = this._allConnections[i];
  92. connection._realEnd(endCB);
  93. }
  94. };
  95. Pool.prototype.query = function (sql, values, cb) {
  96. if (typeof values === 'function') {
  97. cb = values;
  98. values = null;
  99. }
  100. this.getConnection(function (err, conn) {
  101. if (err) return cb(err);
  102. conn.query(sql, values, function () {
  103. conn.release();
  104. cb.apply(this, arguments);
  105. });
  106. });
  107. };
  108. Pool.prototype.execute = function (sql, values, cb) {
  109. if (typeof values === 'function') {
  110. cb = values;
  111. values = null;
  112. }
  113. this.getConnection(function (err, conn) {
  114. if (err) return cb(err);
  115. conn.execute(sql, values, function () {
  116. conn.release();
  117. cb.apply(this, arguments);
  118. });
  119. });
  120. };
  121. Pool.prototype._removeConnection = function(connection) {
  122. var i;
  123. for (i = 0; i < this._allConnections.length; i++) {
  124. if (this._allConnections[i] === connection) {
  125. this._allConnections.splice(i, 1);
  126. break;
  127. }
  128. }
  129. for (i = 0; i < this._freeConnections.length; i++) {
  130. if (this._freeConnections[i] === connection) {
  131. this._freeConnections.splice(i, 1);
  132. break;
  133. }
  134. }
  135. this.releaseConnection(connection);
  136. };
  137. Pool.prototype.escape = function(value) {
  138. return mysql.escape(value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone);
  139. };
  140. Pool.prototype.escapeId = function escapeId(value) {
  141. return mysql.escapeId(value, false);
  142. };