connection.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. var net = require('net');
  2. var util = require('util');
  3. var EventEmitter = require('events').EventEmitter;
  4. var Queue = require('double-ended-queue');
  5. var PacketParser = require('./packet_parser.js');
  6. var Packet = require('./packets/packet.js');
  7. var Packets = require('./packets/index.js');
  8. var Commands = require('./commands/index.js');
  9. var SqlString = require('./sql_string.js');
  10. var ConnectionConfig = require('./connection_config.js');
  11. var _connectionId = 0;
  12. var noop = function() {};
  13. function Connection(opts)
  14. {
  15. EventEmitter.call(this);
  16. this.config = opts.config;
  17. // TODO: fill defaults
  18. // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
  19. // if host is given, connect to host:3306
  20. // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
  21. // if there is no host/port and no socketPath parameters?
  22. if (!opts.config.stream) {
  23. if (opts.config.socketPath)
  24. this.stream = net.connect(opts.config.socketPath);
  25. else
  26. this.stream = net.connect(opts.config.port, opts.config.host);
  27. } else {
  28. // if stream is a function, treat it as "stream agent / factory"
  29. if (typeof opts.config.stream == 'function')
  30. this.stream = opts.config.stream(opts);
  31. else
  32. this.stream = opts.config.stream;
  33. }
  34. this._internalId = _connectionId++;
  35. this._commands = new Queue();
  36. this._command = null;
  37. this._paused = false;
  38. this._paused_packets = new Queue();
  39. this._statements = {};
  40. // TODO: make it lru cache
  41. // https://github.com/mercadolibre/node-simple-lru-cache
  42. // or https://github.com/rsms/js-lru
  43. // or https://github.com/monsur/jscache
  44. // or https://github.com/isaacs/node-lru-cache
  45. //
  46. // key is field.name + ':' + field.columnType + ':' field.flags + '/'
  47. this.textProtocolParsers = {};
  48. // TODO: not sure if cache should be separate (same key as with textProtocolParsers)
  49. // or part of prepared statements cache (key is sql query)
  50. this.binaryProtocolParsers = {};
  51. this.serverCapabilityFlags = 0;
  52. this.authorized = false;
  53. var connection = this;
  54. this.sequenceId = 0;
  55. this.threadId = null;
  56. this._handshakePacket = null;
  57. this.stream.on('error', function(err) {
  58. connection.emit('error', err);
  59. });
  60. // big TODO: benchmark if it all worth using 'ondata' and onPacket callbacks directly
  61. // compositing streams would be much more easier.
  62. // also, look for existing length-prefixed streams to reuse instead of packet_parser
  63. // https://github.com/squaremo/node-spb - currently only fixed 4 byte prefix
  64. // ...?
  65. // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
  66. this.packetParser = new PacketParser(function(p) { connection.handlePacket(p); });
  67. // TODO: this code used to be an optimized version of handler
  68. // DOES NOT WORK IN NODE 11
  69. // TODO: measure if we actually get something here
  70. // if yes, re-enable for node 10
  71. //if (this.stream instanceof net.Stream) {
  72. // this.stream.ondata = function(data, start, end) {
  73. // connection.packetParser.execute(data, start, end);
  74. // };
  75. //} else {
  76. this.stream.on('data', function(data) {
  77. connection.packetParser.execute(data);
  78. });
  79. //}
  80. this._protocolError = null;
  81. this.stream.on('end', function() {
  82. // we need to set this flag everywhere where we want connection to close
  83. if (connection._closing)
  84. return;
  85. // TODO: move to protocolError()
  86. if (!connection._protocolError) // no particular error message before disconnect
  87. connection._protocolError = 'PROTOCOL_CONNECTION_LOST';
  88. var err = new Error('Connection lost: The server closed the connection.');
  89. err.fatal = true;
  90. err.code = connection._protocolError;
  91. var command;
  92. if (connection._command && connection._command.onResult)
  93. connection._command.onResult(err);
  94. while (command = connection._commands.shift()) {
  95. if (command.onResult)
  96. command.onResult(err);
  97. }
  98. connection.emit('error', err);
  99. });
  100. var handshakeCommand;
  101. if (!this.config.isServer) {
  102. handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
  103. handshakeCommand.on('error', function(e) { connection.emit('error', e); });
  104. handshakeCommand.on('end', function() {
  105. connection._handshakePacket = handshakeCommand.handshake;
  106. connection.threadId = handshakeCommand.handshake.connectionId;
  107. });
  108. this.addCommand(handshakeCommand);
  109. }
  110. }
  111. util.inherits(Connection, EventEmitter);
  112. Connection.prototype.write = function(buffer) {
  113. this.stream.write(buffer);
  114. };
  115. // TODO: replace function in runtime instead of having if() here
  116. // Needs benchmark.
  117. Connection.prototype.writePacket = function(packet) {
  118. packet.writeHeader(this.sequenceId);
  119. if (this.config.debug) {
  120. console.log(this._internalId + ' ' + this.connectionId + ' <== ' + this._command._commandName + '#' + this._command.stateName() + '(' + [this.sequenceId, packet._name, packet.length()].join(',') + ')');
  121. }
  122. this.sequenceId++;
  123. if (this.sequenceId == 256)
  124. this.sequenceId = 0
  125. if (!this.config.compress || !this.authorized) {
  126. this.write(packet.buffer);
  127. } else {
  128. var packetLen = packet.length();
  129. var compressHeader = new Buffer(7);
  130. // TODO: currently all outgoing packets are sent uncompressed (header + deflated length=0 as uncompressed flag)
  131. // Need to implement deflation of outgoing packet. Also need to decide when not to compress small packets
  132. // http://dev.mysql.com/doc/internals/en/compression.html#uncompressed-payload suggest not to compress packets less than 50 bytes
  133. // Write uncompressed packet
  134. compressHeader.fill(0);
  135. compressHeader.writeUInt8(packetLen & 0xff, 0);
  136. compressHeader.writeUInt16LE(packetLen >> 8, 1);
  137. this.write(compressHeader);
  138. this.write(packet.buffer);
  139. }
  140. };
  141. Connection.prototype.startTLS = function(onSecure) {
  142. if (this.config.debug) {
  143. console.log('Upgrading connection to TLS');
  144. }
  145. var connection = this;
  146. var crypto = require('crypto');
  147. var tls = require('tls');
  148. var config = this.config;
  149. var stream = this.stream;
  150. var credentials = crypto.createCredentials({
  151. key: config.ssl.key,
  152. cert: config.ssl.cert,
  153. passphrase: config.ssl.passphrase,
  154. ca: config.ssl.ca
  155. });
  156. var securePair = tls.createSecurePair(credentials, false);
  157. if (stream.ondata)
  158. stream.ondata = null;
  159. stream.removeAllListeners('data');
  160. stream.pipe(securePair.encrypted);
  161. securePair.encrypted.pipe(stream);
  162. securePair.cleartext.on('data', function(data) {
  163. connection.packetParser.execute(data);
  164. });
  165. connection.write = function(buffer) {
  166. securePair.cleartext.write(buffer);
  167. };
  168. securePair.on('secure', onSecure);
  169. };
  170. // TODO: this does not work if uncompressed packet is split by compressed
  171. // packet boundary.
  172. // My assumption about compressedPacket to contain one or more complete
  173. // compressed packets was wrong. It can wrap any chunk of data.
  174. // This will be rmoved in favor of connection.startInflate
  175. // currently Handshake command overwrites connection.handlePacket with handleCompressedPacket
  176. // before expecting first compressed packet
  177. var zlib = require('zlib');
  178. Connection.prototype.handleCompressedPacket = function(packet) {
  179. var connection = this;
  180. var inflatedLength = packet.readInt24();
  181. if (inflatedLength !== 0) {
  182. var compressedBody = packet.readBuffer(packet.length() - 3);
  183. zlib.inflate(compressedBody, function(err, packets) {
  184. if (err)
  185. return connection.emit('error', err);
  186. var offset = packets.offset;
  187. var end = offset + packets.length;
  188. var buffer = packets.parent;
  189. var len = 0;
  190. var id = 0;
  191. // single compressed packet can contain multiple uncompressed
  192. while (offset < end) {
  193. len = buffer.readUInt16LE(offset) + (buffer[offset+2] << 16);
  194. id = buffer[offset+3];
  195. connection.handlePacket(new Packet(id, buffer, offset + 4, offset + 4 + len));
  196. offset += 4 + len;
  197. }
  198. });
  199. } else {
  200. inflatedLength = packet.readInt24();
  201. var sequenceId = packet.readInt8();
  202. connection.handlePacket(new Packet(sequenceId, packet.buffer, packet.offset, packet.offset + inflatedLength));
  203. }
  204. };
  205. // TODO: consider using @creationix simple-streams
  206. // https://gist.github.com/creationix/5498108
  207. // https://github.com/creationix/min-stream-uv
  208. // https://github.com/creationix/min-stream-helpers
  209. // TODO: try with Stream2 streams
  210. //
  211. // changes stream -> packetParser to
  212. // stream -> compressedPacketParser -> inflateStream -> packetParser
  213. // note that in the caseof ssl this should become
  214. // stream -> securePair.encrypted -> securePair.cleartext -> compressedPacketParser -> inflateStream -> packetParser
  215. Connection.prototype.startInflate = function() {
  216. var connection = this;
  217. var zlib = require('zlib');
  218. var inflateStream = zlib.createInflate();
  219. var uncompressedPacketParser = connection.packetParser;
  220. connection.packetParser = new PacketParser(function(packet) {
  221. var inflatedLength = packet.readInt24();
  222. if (inflatedLength !== 0) {
  223. inflateStream.write(packet.readBuffer(packet.length() - 3));
  224. } else {
  225. uncompressedPacketParser.execute(packet.buffer, packet.offset, packet.end);
  226. }
  227. });
  228. inflateStream.on('data', function(buff) {
  229. uncompressedPacketParser.execute(buff.parent, buff.offset, buff.offset + buff.length);
  230. });
  231. if (this.stream.ondata)
  232. this.stream.ondata = null;
  233. this.stream.removeAllListeners('data');
  234. this.pipe();
  235. };
  236. Connection.prototype.pipe = function() {
  237. var connection = this;
  238. if (this.stream instanceof net.Stream) {
  239. this.stream.ondata = function(data, start, end) {
  240. connection.packetParser.execute(data, start, end);
  241. };
  242. } else {
  243. this.stream.on('data', function(data) {
  244. connection.packetParser.execute(data.parent, data.offset, data.offset + data.length);
  245. });
  246. }
  247. };
  248. Connection.prototype.protocolError = function(message, code) {
  249. var err = new Error(message);
  250. err.fatal = true;
  251. err.code = code || 'PROTOCOL_ERROR';
  252. this.emit('error', err);
  253. }
  254. Connection.prototype.handlePacket = function(packet) {
  255. if (this._paused)
  256. {
  257. this._paused_packets.push(packet);
  258. return;
  259. }
  260. // TODO: check packet sequenceId here
  261. if (packet)
  262. this.sequenceId = packet.sequenceId + 1;
  263. if (this.config.debug) {
  264. if (packet) {
  265. console.log(this._internalId + ' ' + this.connectionId + ' ==> ' + this._command._commandName + '#' + this._command.stateName() + '(' + [packet.sequenceId, packet.type(), packet.length()].join(',') + ')');
  266. console.log(' raw: ' + packet.buffer.slice(packet.offset, packet.offset + packet.length()).toString('hex'));
  267. }
  268. }
  269. if (!this._command) {
  270. this.protocolError('Unexpected packet while no commands in the queue', 'PROTOCOL_UNEXPECTED_PACKET');
  271. return this.close();
  272. }
  273. var done = this._command.execute(packet, this);
  274. if (done) {
  275. this.sequenceId = 0;
  276. this._command = this._commands.shift();
  277. if (this._command)
  278. this.handlePacket();
  279. }
  280. };
  281. Connection.prototype.addCommand = function(cmd) {
  282. if (this.config.debug) {
  283. console.log('Add command: ' + arguments.callee.caller.name);
  284. cmd._commandName = arguments.callee.caller.name;
  285. }
  286. if (!this._command) {
  287. this._command = cmd;
  288. this.handlePacket();
  289. } else {
  290. this._commands.push(cmd);
  291. }
  292. return cmd;
  293. };
  294. Connection.prototype.format = function(sql, values) {
  295. if (typeof this.config.queryFormat == "function") {
  296. return this.config.queryFormat.call(this, sql, values, this.config.timezone);
  297. }
  298. var opts = {
  299. sql: sql,
  300. values: values
  301. };
  302. this._resolveNamedPlaceholders(opts);
  303. return SqlString.format(opts.sql, opts.values, this.config.stringifyObjects, this.config.timezone);
  304. };
  305. Connection.prototype.escape = function(value) {
  306. return SqlString.escape(value, false, this.config.timezone);
  307. };
  308. Connection.prototype.escapeId = function escapeId(value) {
  309. return SqlString.escapeId(value, false);
  310. };
  311. function _domainify(callback) {
  312. var domain = process.domain;
  313. if (domain && callback)
  314. return process.domain.bind(callback);
  315. else
  316. return callback;
  317. }
  318. var convertNamedPlaceholders = null;
  319. Connection.prototype._resolveNamedPlaceholders = function(options) {
  320. var unnamed;
  321. if (this.config.namedPlaceholders || options.namedPlaceholders) {
  322. if (convertNamedPlaceholders === null)
  323. convertNamedPlaceholders = require('named-placeholders')();
  324. unnamed = convertNamedPlaceholders(options.sql, options.values);
  325. options.sql = unnamed[0];
  326. options.values = unnamed[1];
  327. }
  328. };
  329. Connection.prototype.query = function query(sql, values, cb) {
  330. // copy-paste from node-mysql/lib/Connection.js:createQuery
  331. var options = {};
  332. if (typeof sql === 'object') {
  333. // query(options, cb)
  334. options = sql;
  335. if (typeof values === 'function') {
  336. cb = values;
  337. } else {
  338. options.values = values;
  339. }
  340. } else if (typeof values === 'function') {
  341. // query(sql, cb)
  342. cb = values;
  343. options.sql = sql;
  344. options.values = undefined;
  345. } else {
  346. // query(sql, values, cb)
  347. options.sql = sql;
  348. options.values = values;
  349. }
  350. this._resolveNamedPlaceholders(options);
  351. var rawSql = this.format(options.sql, options.values || []);
  352. return this.addCommand(new Commands.Query(rawSql, options, _domainify(cb)));
  353. };
  354. Connection.prototype.pause = function pause() {
  355. this._paused = true;
  356. this.stream.pause();
  357. };
  358. Connection.prototype.resume= function resume() {
  359. var packet;
  360. this._paused = false;
  361. while( packet = this._paused_packets.shift() ) {
  362. this.handlePacket(packet);
  363. // don't resume if packet hander paused connection
  364. if (this._paused)
  365. return;
  366. }
  367. this.stream.resume();
  368. };
  369. Connection.prototype.keyFromFields = function keyFromFields(fields, options) {
  370. var res = (typeof options.nestTables) + '/' + options.nestTables + '/' + options.rowsAsHash;
  371. for (var i=0; i < fields.length; ++i)
  372. res += '/' + fields[i].name + ':' + fields[i].columnType + ':' + fields[i].flags;
  373. return res;
  374. }
  375. function statementKey(options) {
  376. return (typeof options.nestTables) +
  377. '/' + options.nestTables + '/' + options.rowsAsHash + options.sql;
  378. }
  379. // TODO: named placeholders support
  380. Connection.prototype.prepare = function prepare(options, cb) {
  381. if (typeof options == 'string')
  382. options = { sql: options };
  383. return this.addCommand(new Commands.Prepare(options, _domainify(cb)));
  384. };
  385. Connection.prototype.unprepare = function execute(sql) {
  386. var options = {};
  387. if (typeof sql === 'object') {
  388. options = sql;
  389. } else
  390. options.sql = sql;
  391. var key = statementKey(options);
  392. var stmt = this._statements[key];
  393. if (stmt) {
  394. this._statements[key] = null;
  395. stmt.close();
  396. }
  397. return stmt;
  398. };
  399. Connection.prototype.execute = function execute(sql, values, cb) {
  400. var options = {};
  401. if (typeof sql === 'object') {
  402. // execute(options, cb)
  403. options = sql;
  404. if (typeof values === 'function') {
  405. cb = values;
  406. } else {
  407. options.values = values;
  408. }
  409. } else if (typeof values === 'function') {
  410. // execute(sql, cb)
  411. cb = values;
  412. options.sql = sql;
  413. options.values = undefined;
  414. } else {
  415. // execute(sql, values, cb)
  416. options.sql = sql;
  417. options.values = values;
  418. }
  419. cb = _domainify(cb);
  420. this._resolveNamedPlaceholders(options);
  421. var connection = this;
  422. var key = statementKey(options);
  423. var statement = connection._statements[key];
  424. options.statement = statement;
  425. var executeCommand = new Commands.Execute(options, cb);
  426. if (!statement) {
  427. connection.prepare(options, function executeStatement(err, stmt) {
  428. if (err) {
  429. if (cb)
  430. cb(err);
  431. else
  432. executeCommand.emit('error', err);
  433. return;
  434. }
  435. executeCommand.statement = stmt;
  436. connection._statements[key] = stmt;
  437. connection.addCommand(executeCommand);
  438. });
  439. } else {
  440. connection.addCommand(executeCommand);
  441. }
  442. return executeCommand;
  443. };
  444. Connection.prototype.changeUser = function changeUser(options, callback) {
  445. if (!callback && typeof options === 'function') {
  446. callback = options;
  447. options = {};
  448. }
  449. var charsetNumber = (options.charset) ? ConnectionConfig.getCharsetNumber(options.charset) : this.config.charsetNumber;
  450. return this.addCommand(new Commands.ChangeUser({
  451. user : options.user || this.config.user,
  452. password : options.password || this.config.password,
  453. passwordSha1 : options.passwordSha1 || this.config.passwordSha1,
  454. database : options.database || this.config.database,
  455. timeout : options.timeout,
  456. charsetNumber : charsetNumber,
  457. currentConfig : this.config
  458. }, _domainify(function(err) {
  459. if (err)
  460. err.fatal = true;
  461. if (callback) callback(err);
  462. })));
  463. };
  464. // transaction helpers
  465. Connection.prototype.beginTransaction = function(cb) {
  466. return this.query('START TRANSACTION', cb);
  467. };
  468. Connection.prototype.commit = function(cb) {
  469. return this.query('COMMIT', cb);
  470. };
  471. Connection.prototype.rollback = function(cb) {
  472. return this.query('ROLLBACK', cb);
  473. };
  474. Connection.prototype.ping = function ping(cb) {
  475. return this.addCommand(new Commands.Ping(_domainify(cb)));
  476. };
  477. Connection.prototype._registerSlave = function registerSlave(opts, cb) {
  478. return this.addCommand(new Commands.RegisterSlave(opts, _domainify(cb)));
  479. };
  480. Connection.prototype._binlogDump = function binlogDump(opts, cb) {
  481. return this.addCommand(new Commands.BinlogDump(opts, _domainify(cb)));
  482. };
  483. // currently just alias to close
  484. Connection.prototype.destroy = function() {
  485. this.close();
  486. };
  487. Connection.prototype.close = function() {
  488. this._closing = true;
  489. this.stream.end();
  490. };
  491. Connection.prototype.createBinlogStream = function(opts) {
  492. // TODO: create proper stream class
  493. // TODO: use through2
  494. var test = 1;
  495. var Readable = require('stream').Readable;
  496. var stream = new Readable({objectMode: true});
  497. stream._read = function() {
  498. return {
  499. data: test++
  500. };
  501. };
  502. var connection = this;
  503. connection._registerSlave(opts, function(err) {
  504. var dumpCmd = connection._binlogDump(opts);
  505. dumpCmd.on('event', function(ev) {
  506. stream.push(ev);
  507. });
  508. dumpCmd.on('eof', function() {
  509. stream.push(null);
  510. // if non-blocking, then close stream to prevent errors
  511. if (opts.flags && (opts.flags & 0x01)) {
  512. connection.close();
  513. }
  514. });
  515. // TODO: pipe errors as well
  516. });
  517. return stream;
  518. };
  519. Connection.prototype.connect = function(cb) {
  520. if (!cb) return;
  521. var connectCalled = 0;
  522. // TODO domainify this callback as well. Note that domain has to be captured
  523. // at the top of function due to nested callback
  524. function callbackOnce(isErrorHandler) {
  525. return function(param) {
  526. if (!connectCalled) {
  527. if (isErrorHandler)
  528. cb(param);
  529. else
  530. cb(null, param);
  531. }
  532. connectCalled = 1;
  533. };
  534. }
  535. this.once('error', callbackOnce(true) );
  536. this.once('connect', callbackOnce(false));
  537. };
  538. // ===================================
  539. // outgoing server connection methods
  540. // ===================================
  541. Connection.prototype.writeColumns = function(columns) {
  542. var connection = this;
  543. this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
  544. columns.forEach(function(column) {
  545. connection.writePacket(Packets.ColumnDefinition.toPacket(column));
  546. });
  547. this.writeEof();
  548. };
  549. // row is array of columns, not hash
  550. Connection.prototype.writeTextRow = function(column) {
  551. this.writePacket(Packets.TextRow.toPacket(column));
  552. };
  553. Connection.prototype.writeTextResult = function(rows, columns) {
  554. var connection = this;
  555. connection.writeColumns(columns);
  556. rows.forEach(function(row) {
  557. var arrayRow = new Array(columns.length);
  558. columns.forEach(function(column) {
  559. arrayRow.push(row[column.name]);
  560. });
  561. connection.writeTextRow(arrayRow);
  562. });
  563. connection.writeEof();
  564. };
  565. Connection.prototype.writeEof = function(warnings, statusFlags) {
  566. this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
  567. };
  568. Connection.prototype.writeOk = function(args) {
  569. if (!args)
  570. args = { affectedRows: 0 };
  571. this.writePacket(Packets.OK.toPacket(args));
  572. };
  573. Connection.prototype.writeError = function(args) {
  574. this.writePacket(Packets.Error.toPacket(args));
  575. };
  576. Connection.prototype.serverHandshake = function serverHandshake(args) {
  577. return this.addCommand(new Commands.ServerHandshake(args));
  578. };
  579. // ===============================================================
  580. // TODO: domainify
  581. Connection.prototype.end = function(callback) {
  582. var connection = this;
  583. // trigger error if more commands enqueued after end command
  584. var quitCmd = this.addCommand(new Commands.Quit(callback));
  585. connection.addCommand = function() {
  586. if (connection._closing) {
  587. this.emit(new Error('addCommand() called on closing connection'));
  588. }
  589. };
  590. return quitCmd;
  591. };
  592. module.exports = Connection;