123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673 |
- var net = require('net');
- var util = require('util');
- var EventEmitter = require('events').EventEmitter;
- var Queue = require('double-ended-queue');
- var PacketParser = require('./packet_parser.js');
- var Packet = require('./packets/packet.js');
- var Packets = require('./packets/index.js');
- var Commands = require('./commands/index.js');
- var SqlString = require('./sql_string.js');
- var ConnectionConfig = require('./connection_config.js');
- var _connectionId = 0;
- var noop = function() {};
- function Connection(opts)
- {
- EventEmitter.call(this);
- this.config = opts.config;
- // TODO: fill defaults
- // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
- // if host is given, connect to host:3306
- // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
- // if there is no host/port and no socketPath parameters?
- if (!opts.config.stream) {
- if (opts.config.socketPath)
- this.stream = net.connect(opts.config.socketPath);
- else
- this.stream = net.connect(opts.config.port, opts.config.host);
- } else {
- // if stream is a function, treat it as "stream agent / factory"
- if (typeof opts.config.stream == 'function')
- this.stream = opts.config.stream(opts);
- else
- this.stream = opts.config.stream;
- }
- this._internalId = _connectionId++;
- this._commands = new Queue();
- this._command = null;
- this._paused = false;
- this._paused_packets = new Queue();
- this._statements = {};
- // TODO: make it lru cache
- // https://github.com/mercadolibre/node-simple-lru-cache
- // or https://github.com/rsms/js-lru
- // or https://github.com/monsur/jscache
- // or https://github.com/isaacs/node-lru-cache
- //
- // key is field.name + ':' + field.columnType + ':' field.flags + '/'
- this.textProtocolParsers = {};
- // TODO: not sure if cache should be separate (same key as with textProtocolParsers)
- // or part of prepared statements cache (key is sql query)
- this.binaryProtocolParsers = {};
- this.serverCapabilityFlags = 0;
- this.authorized = false;
- var connection = this;
- this.sequenceId = 0;
- this.threadId = null;
- this._handshakePacket = null;
- this.stream.on('error', function(err) {
- connection.emit('error', err);
- });
- // big TODO: benchmark if it all worth using 'ondata' and onPacket callbacks directly
- // compositing streams would be much more easier.
- // also, look for existing length-prefixed streams to reuse instead of packet_parser
- // https://github.com/squaremo/node-spb - currently only fixed 4 byte prefix
- // ...?
- // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
- this.packetParser = new PacketParser(function(p) { connection.handlePacket(p); });
- // TODO: this code used to be an optimized version of handler
- // DOES NOT WORK IN NODE 11
- // TODO: measure if we actually get something here
- // if yes, re-enable for node 10
- //if (this.stream instanceof net.Stream) {
- // this.stream.ondata = function(data, start, end) {
- // connection.packetParser.execute(data, start, end);
- // };
- //} else {
- this.stream.on('data', function(data) {
- connection.packetParser.execute(data);
- });
- //}
- this._protocolError = null;
- this.stream.on('end', function() {
- // we need to set this flag everywhere where we want connection to close
- if (connection._closing)
- return;
- // TODO: move to protocolError()
- if (!connection._protocolError) // no particular error message before disconnect
- connection._protocolError = 'PROTOCOL_CONNECTION_LOST';
- var err = new Error('Connection lost: The server closed the connection.');
- err.fatal = true;
- err.code = connection._protocolError;
- var command;
- if (connection._command && connection._command.onResult)
- connection._command.onResult(err);
- while (command = connection._commands.shift()) {
- if (command.onResult)
- command.onResult(err);
- }
- connection.emit('error', err);
- });
- var handshakeCommand;
- if (!this.config.isServer) {
- handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
- handshakeCommand.on('error', function(e) { connection.emit('error', e); });
- handshakeCommand.on('end', function() {
- connection._handshakePacket = handshakeCommand.handshake;
- connection.threadId = handshakeCommand.handshake.connectionId;
- });
- this.addCommand(handshakeCommand);
- }
- }
- util.inherits(Connection, EventEmitter);
- Connection.prototype.write = function(buffer) {
- this.stream.write(buffer);
- };
- // TODO: replace function in runtime instead of having if() here
- // Needs benchmark.
- Connection.prototype.writePacket = function(packet) {
- packet.writeHeader(this.sequenceId);
- if (this.config.debug) {
- console.log(this._internalId + ' ' + this.connectionId + ' <== ' + this._command._commandName + '#' + this._command.stateName() + '(' + [this.sequenceId, packet._name, packet.length()].join(',') + ')');
- }
- this.sequenceId++;
- if (this.sequenceId == 256)
- this.sequenceId = 0
- if (!this.config.compress || !this.authorized) {
- this.write(packet.buffer);
- } else {
- var packetLen = packet.length();
- var compressHeader = new Buffer(7);
- // TODO: currently all outgoing packets are sent uncompressed (header + deflated length=0 as uncompressed flag)
- // Need to implement deflation of outgoing packet. Also need to decide when not to compress small packets
- // http://dev.mysql.com/doc/internals/en/compression.html#uncompressed-payload suggest not to compress packets less than 50 bytes
- // Write uncompressed packet
- compressHeader.fill(0);
- compressHeader.writeUInt8(packetLen & 0xff, 0);
- compressHeader.writeUInt16LE(packetLen >> 8, 1);
- this.write(compressHeader);
- this.write(packet.buffer);
- }
- };
- Connection.prototype.startTLS = function(onSecure) {
- if (this.config.debug) {
- console.log('Upgrading connection to TLS');
- }
- var connection = this;
- var crypto = require('crypto');
- var tls = require('tls');
- var config = this.config;
- var stream = this.stream;
- var credentials = crypto.createCredentials({
- key: config.ssl.key,
- cert: config.ssl.cert,
- passphrase: config.ssl.passphrase,
- ca: config.ssl.ca
- });
- var securePair = tls.createSecurePair(credentials, false);
- if (stream.ondata)
- stream.ondata = null;
- stream.removeAllListeners('data');
- stream.pipe(securePair.encrypted);
- securePair.encrypted.pipe(stream);
- securePair.cleartext.on('data', function(data) {
- connection.packetParser.execute(data);
- });
- connection.write = function(buffer) {
- securePair.cleartext.write(buffer);
- };
- securePair.on('secure', onSecure);
- };
- // TODO: this does not work if uncompressed packet is split by compressed
- // packet boundary.
- // My assumption about compressedPacket to contain one or more complete
- // compressed packets was wrong. It can wrap any chunk of data.
- // This will be rmoved in favor of connection.startInflate
- // currently Handshake command overwrites connection.handlePacket with handleCompressedPacket
- // before expecting first compressed packet
- var zlib = require('zlib');
- Connection.prototype.handleCompressedPacket = function(packet) {
- var connection = this;
- var inflatedLength = packet.readInt24();
- if (inflatedLength !== 0) {
- var compressedBody = packet.readBuffer(packet.length() - 3);
- zlib.inflate(compressedBody, function(err, packets) {
- if (err)
- return connection.emit('error', err);
- var offset = packets.offset;
- var end = offset + packets.length;
- var buffer = packets.parent;
- var len = 0;
- var id = 0;
- // single compressed packet can contain multiple uncompressed
- while (offset < end) {
- len = buffer.readUInt16LE(offset) + (buffer[offset+2] << 16);
- id = buffer[offset+3];
- connection.handlePacket(new Packet(id, buffer, offset + 4, offset + 4 + len));
- offset += 4 + len;
- }
- });
- } else {
- inflatedLength = packet.readInt24();
- var sequenceId = packet.readInt8();
- connection.handlePacket(new Packet(sequenceId, packet.buffer, packet.offset, packet.offset + inflatedLength));
- }
- };
- // TODO: consider using @creationix simple-streams
- // https://gist.github.com/creationix/5498108
- // https://github.com/creationix/min-stream-uv
- // https://github.com/creationix/min-stream-helpers
- // TODO: try with Stream2 streams
- //
- // changes stream -> packetParser to
- // stream -> compressedPacketParser -> inflateStream -> packetParser
- // note that in the caseof ssl this should become
- // stream -> securePair.encrypted -> securePair.cleartext -> compressedPacketParser -> inflateStream -> packetParser
- Connection.prototype.startInflate = function() {
- var connection = this;
- var zlib = require('zlib');
- var inflateStream = zlib.createInflate();
- var uncompressedPacketParser = connection.packetParser;
- connection.packetParser = new PacketParser(function(packet) {
- var inflatedLength = packet.readInt24();
- if (inflatedLength !== 0) {
- inflateStream.write(packet.readBuffer(packet.length() - 3));
- } else {
- uncompressedPacketParser.execute(packet.buffer, packet.offset, packet.end);
- }
- });
- inflateStream.on('data', function(buff) {
- uncompressedPacketParser.execute(buff.parent, buff.offset, buff.offset + buff.length);
- });
- if (this.stream.ondata)
- this.stream.ondata = null;
- this.stream.removeAllListeners('data');
- this.pipe();
- };
- Connection.prototype.pipe = function() {
- var connection = this;
- if (this.stream instanceof net.Stream) {
- this.stream.ondata = function(data, start, end) {
- connection.packetParser.execute(data, start, end);
- };
- } else {
- this.stream.on('data', function(data) {
- connection.packetParser.execute(data.parent, data.offset, data.offset + data.length);
- });
- }
- };
- Connection.prototype.protocolError = function(message, code) {
- var err = new Error(message);
- err.fatal = true;
- err.code = code || 'PROTOCOL_ERROR';
- this.emit('error', err);
- }
- Connection.prototype.handlePacket = function(packet) {
- if (this._paused)
- {
- this._paused_packets.push(packet);
- return;
- }
- // TODO: check packet sequenceId here
- if (packet)
- this.sequenceId = packet.sequenceId + 1;
- if (this.config.debug) {
- if (packet) {
- console.log(this._internalId + ' ' + this.connectionId + ' ==> ' + this._command._commandName + '#' + this._command.stateName() + '(' + [packet.sequenceId, packet.type(), packet.length()].join(',') + ')');
- console.log(' raw: ' + packet.buffer.slice(packet.offset, packet.offset + packet.length()).toString('hex'));
- }
- }
- if (!this._command) {
- this.protocolError('Unexpected packet while no commands in the queue', 'PROTOCOL_UNEXPECTED_PACKET');
- return this.close();
- }
- var done = this._command.execute(packet, this);
- if (done) {
- this.sequenceId = 0;
- this._command = this._commands.shift();
- if (this._command)
- this.handlePacket();
- }
- };
- Connection.prototype.addCommand = function(cmd) {
- if (this.config.debug) {
- console.log('Add command: ' + arguments.callee.caller.name);
- cmd._commandName = arguments.callee.caller.name;
- }
- if (!this._command) {
- this._command = cmd;
- this.handlePacket();
- } else {
- this._commands.push(cmd);
- }
- return cmd;
- };
- Connection.prototype.format = function(sql, values) {
- if (typeof this.config.queryFormat == "function") {
- return this.config.queryFormat.call(this, sql, values, this.config.timezone);
- }
- var opts = {
- sql: sql,
- values: values
- };
- this._resolveNamedPlaceholders(opts);
- return SqlString.format(opts.sql, opts.values, this.config.stringifyObjects, this.config.timezone);
- };
- Connection.prototype.escape = function(value) {
- return SqlString.escape(value, false, this.config.timezone);
- };
- Connection.prototype.escapeId = function escapeId(value) {
- return SqlString.escapeId(value, false);
- };
- function _domainify(callback) {
- var domain = process.domain;
- if (domain && callback)
- return process.domain.bind(callback);
- else
- return callback;
- }
- var convertNamedPlaceholders = null;
- Connection.prototype._resolveNamedPlaceholders = function(options) {
- var unnamed;
- if (this.config.namedPlaceholders || options.namedPlaceholders) {
- if (convertNamedPlaceholders === null)
- convertNamedPlaceholders = require('named-placeholders')();
- unnamed = convertNamedPlaceholders(options.sql, options.values);
- options.sql = unnamed[0];
- options.values = unnamed[1];
- }
- };
- Connection.prototype.query = function query(sql, values, cb) {
- // copy-paste from node-mysql/lib/Connection.js:createQuery
- var options = {};
- if (typeof sql === 'object') {
- // query(options, cb)
- options = sql;
- if (typeof values === 'function') {
- cb = values;
- } else {
- options.values = values;
- }
- } else if (typeof values === 'function') {
- // query(sql, cb)
- cb = values;
- options.sql = sql;
- options.values = undefined;
- } else {
- // query(sql, values, cb)
- options.sql = sql;
- options.values = values;
- }
- this._resolveNamedPlaceholders(options);
- var rawSql = this.format(options.sql, options.values || []);
- return this.addCommand(new Commands.Query(rawSql, options, _domainify(cb)));
- };
- Connection.prototype.pause = function pause() {
- this._paused = true;
- this.stream.pause();
- };
- Connection.prototype.resume= function resume() {
- var packet;
- this._paused = false;
- while( packet = this._paused_packets.shift() ) {
- this.handlePacket(packet);
- // don't resume if packet hander paused connection
- if (this._paused)
- return;
- }
- this.stream.resume();
- };
- Connection.prototype.keyFromFields = function keyFromFields(fields, options) {
- var res = (typeof options.nestTables) + '/' + options.nestTables + '/' + options.rowsAsHash;
- for (var i=0; i < fields.length; ++i)
- res += '/' + fields[i].name + ':' + fields[i].columnType + ':' + fields[i].flags;
- return res;
- }
- function statementKey(options) {
- return (typeof options.nestTables) +
- '/' + options.nestTables + '/' + options.rowsAsHash + options.sql;
- }
- // TODO: named placeholders support
- Connection.prototype.prepare = function prepare(options, cb) {
- if (typeof options == 'string')
- options = { sql: options };
- return this.addCommand(new Commands.Prepare(options, _domainify(cb)));
- };
- Connection.prototype.unprepare = function execute(sql) {
- var options = {};
- if (typeof sql === 'object') {
- options = sql;
- } else
- options.sql = sql;
- var key = statementKey(options);
- var stmt = this._statements[key];
- if (stmt) {
- this._statements[key] = null;
- stmt.close();
- }
- return stmt;
- };
- Connection.prototype.execute = function execute(sql, values, cb) {
- var options = {};
- if (typeof sql === 'object') {
- // execute(options, cb)
- options = sql;
- if (typeof values === 'function') {
- cb = values;
- } else {
- options.values = values;
- }
- } else if (typeof values === 'function') {
- // execute(sql, cb)
- cb = values;
- options.sql = sql;
- options.values = undefined;
- } else {
- // execute(sql, values, cb)
- options.sql = sql;
- options.values = values;
- }
- cb = _domainify(cb);
- this._resolveNamedPlaceholders(options);
- var connection = this;
- var key = statementKey(options);
- var statement = connection._statements[key];
- options.statement = statement;
- var executeCommand = new Commands.Execute(options, cb);
- if (!statement) {
- connection.prepare(options, function executeStatement(err, stmt) {
- if (err) {
- if (cb)
- cb(err);
- else
- executeCommand.emit('error', err);
- return;
- }
- executeCommand.statement = stmt;
- connection._statements[key] = stmt;
- connection.addCommand(executeCommand);
- });
- } else {
- connection.addCommand(executeCommand);
- }
- return executeCommand;
- };
- Connection.prototype.changeUser = function changeUser(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- var charsetNumber = (options.charset) ? ConnectionConfig.getCharsetNumber(options.charset) : this.config.charsetNumber;
- return this.addCommand(new Commands.ChangeUser({
- user : options.user || this.config.user,
- password : options.password || this.config.password,
- passwordSha1 : options.passwordSha1 || this.config.passwordSha1,
- database : options.database || this.config.database,
- timeout : options.timeout,
- charsetNumber : charsetNumber,
- currentConfig : this.config
- }, _domainify(function(err) {
- if (err)
- err.fatal = true;
- if (callback) callback(err);
- })));
- };
- // transaction helpers
- Connection.prototype.beginTransaction = function(cb) {
- return this.query('START TRANSACTION', cb);
- };
- Connection.prototype.commit = function(cb) {
- return this.query('COMMIT', cb);
- };
- Connection.prototype.rollback = function(cb) {
- return this.query('ROLLBACK', cb);
- };
- Connection.prototype.ping = function ping(cb) {
- return this.addCommand(new Commands.Ping(_domainify(cb)));
- };
- Connection.prototype._registerSlave = function registerSlave(opts, cb) {
- return this.addCommand(new Commands.RegisterSlave(opts, _domainify(cb)));
- };
- Connection.prototype._binlogDump = function binlogDump(opts, cb) {
- return this.addCommand(new Commands.BinlogDump(opts, _domainify(cb)));
- };
- // currently just alias to close
- Connection.prototype.destroy = function() {
- this.close();
- };
- Connection.prototype.close = function() {
- this._closing = true;
- this.stream.end();
- };
- Connection.prototype.createBinlogStream = function(opts) {
- // TODO: create proper stream class
- // TODO: use through2
- var test = 1;
- var Readable = require('stream').Readable;
- var stream = new Readable({objectMode: true});
- stream._read = function() {
- return {
- data: test++
- };
- };
- var connection = this;
- connection._registerSlave(opts, function(err) {
- var dumpCmd = connection._binlogDump(opts);
- dumpCmd.on('event', function(ev) {
- stream.push(ev);
- });
- dumpCmd.on('eof', function() {
- stream.push(null);
- // if non-blocking, then close stream to prevent errors
- if (opts.flags && (opts.flags & 0x01)) {
- connection.close();
- }
- });
- // TODO: pipe errors as well
- });
- return stream;
- };
- Connection.prototype.connect = function(cb) {
- if (!cb) return;
- var connectCalled = 0;
- // TODO domainify this callback as well. Note that domain has to be captured
- // at the top of function due to nested callback
- function callbackOnce(isErrorHandler) {
- return function(param) {
- if (!connectCalled) {
- if (isErrorHandler)
- cb(param);
- else
- cb(null, param);
- }
- connectCalled = 1;
- };
- }
- this.once('error', callbackOnce(true) );
- this.once('connect', callbackOnce(false));
- };
- // ===================================
- // outgoing server connection methods
- // ===================================
- Connection.prototype.writeColumns = function(columns) {
- var connection = this;
- this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
- columns.forEach(function(column) {
- connection.writePacket(Packets.ColumnDefinition.toPacket(column));
- });
- this.writeEof();
- };
- // row is array of columns, not hash
- Connection.prototype.writeTextRow = function(column) {
- this.writePacket(Packets.TextRow.toPacket(column));
- };
- Connection.prototype.writeTextResult = function(rows, columns) {
- var connection = this;
- connection.writeColumns(columns);
- rows.forEach(function(row) {
- var arrayRow = new Array(columns.length);
- columns.forEach(function(column) {
- arrayRow.push(row[column.name]);
- });
- connection.writeTextRow(arrayRow);
- });
- connection.writeEof();
- };
- Connection.prototype.writeEof = function(warnings, statusFlags) {
- this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
- };
- Connection.prototype.writeOk = function(args) {
- if (!args)
- args = { affectedRows: 0 };
- this.writePacket(Packets.OK.toPacket(args));
- };
- Connection.prototype.writeError = function(args) {
- this.writePacket(Packets.Error.toPacket(args));
- };
- Connection.prototype.serverHandshake = function serverHandshake(args) {
- return this.addCommand(new Commands.ServerHandshake(args));
- };
- // ===============================================================
- // TODO: domainify
- Connection.prototype.end = function(callback) {
- var connection = this;
- // trigger error if more commands enqueued after end command
- var quitCmd = this.addCommand(new Commands.Quit(callback));
- connection.addCommand = function() {
- if (connection._closing) {
- this.emit(new Error('addCommand() called on closing connection'));
- }
- };
- return quitCmd;
- };
- module.exports = Connection;
|