Protocol.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. var Parser = require('./Parser');
  2. var Sequences = require('./sequences');
  3. var Packets = require('./packets');
  4. var Timers = require('timers');
  5. var Stream = require('stream').Stream;
  6. var Util = require('util');
  7. var PacketWriter = require('./PacketWriter');
  8. module.exports = Protocol;
  9. Util.inherits(Protocol, Stream);
  10. function Protocol(options) {
  11. Stream.call(this);
  12. options = options || {};
  13. this.readable = true;
  14. this.writable = true;
  15. this._config = options.config || {};
  16. this._connection = options.connection;
  17. this._callback = null;
  18. this._fatalError = null;
  19. this._quitSequence = null;
  20. this._handshakeSequence = null;
  21. this._handshaked = false;
  22. this._ended = false;
  23. this._destroyed = false;
  24. this._queue = [];
  25. this._handshakeInitializationPacket = null;
  26. this._parser = new Parser({
  27. onError : this.handleParserError.bind(this),
  28. onPacket : this._parsePacket.bind(this),
  29. config : this._config
  30. });
  31. }
  32. Protocol.prototype.write = function(buffer) {
  33. this._parser.write(buffer);
  34. return true;
  35. };
  36. Protocol.prototype.handshake = function handshake(options, callback) {
  37. if (typeof options === 'function') {
  38. callback = options;
  39. options = {};
  40. }
  41. options = options || {};
  42. options.config = this._config;
  43. return this._handshakeSequence = this._enqueue(new Sequences.Handshake(options, callback));
  44. };
  45. Protocol.prototype.query = function query(options, callback) {
  46. return this._enqueue(new Sequences.Query(options, callback));
  47. };
  48. Protocol.prototype.changeUser = function changeUser(options, callback) {
  49. return this._enqueue(new Sequences.ChangeUser(options, callback));
  50. };
  51. Protocol.prototype.ping = function ping(options, callback) {
  52. if (typeof options === 'function') {
  53. callback = options;
  54. options = {};
  55. }
  56. return this._enqueue(new Sequences.Ping(options, callback));
  57. };
  58. Protocol.prototype.stats = function stats(options, callback) {
  59. if (typeof options === 'function') {
  60. callback = options;
  61. options = {};
  62. }
  63. return this._enqueue(new Sequences.Statistics(options, callback));
  64. };
  65. Protocol.prototype.quit = function quit(options, callback) {
  66. if (typeof options === 'function') {
  67. callback = options;
  68. options = {};
  69. }
  70. return this._quitSequence = this._enqueue(new Sequences.Quit(options, callback));
  71. };
  72. Protocol.prototype.end = function() {
  73. if(this._ended) {
  74. return;
  75. }
  76. this._ended = true;
  77. var expected = (this._quitSequence && this._queue[0] === this._quitSequence);
  78. if (expected) {
  79. this._quitSequence.end();
  80. this.emit('end');
  81. return;
  82. }
  83. var err = new Error('Connection lost: The server closed the connection.');
  84. err.fatal = true;
  85. err.code = 'PROTOCOL_CONNECTION_LOST';
  86. this._delegateError(err);
  87. };
  88. Protocol.prototype.pause = function() {
  89. this._parser.pause();
  90. // Since there is a file stream in query, we must transmit pause/resume event to current sequence.
  91. var seq = this._queue[0];
  92. if (seq && seq.emit) {
  93. seq.emit('pause');
  94. }
  95. };
  96. Protocol.prototype.resume = function() {
  97. this._parser.resume();
  98. // Since there is a file stream in query, we must transmit pause/resume event to current sequence.
  99. var seq = this._queue[0];
  100. if (seq && seq.emit) {
  101. seq.emit('resume');
  102. }
  103. };
  104. Protocol.prototype._enqueue = function(sequence) {
  105. if (!this._validateEnqueue(sequence)) {
  106. return sequence;
  107. }
  108. if (this._config.trace) {
  109. // Long stack trace support
  110. sequence._callSite = sequence._callSite || new Error;
  111. }
  112. this._queue.push(sequence);
  113. this.emit('enqueue', sequence);
  114. var self = this;
  115. sequence
  116. .on('error', function(err) {
  117. self._delegateError(err, sequence);
  118. })
  119. .on('packet', function(packet) {
  120. Timers.active(sequence);
  121. self._emitPacket(packet);
  122. })
  123. .on('end', function() {
  124. self._dequeue(sequence);
  125. })
  126. .on('timeout', function() {
  127. var err = new Error(sequence.constructor.name + ' inactivity timeout');
  128. err.code = 'PROTOCOL_SEQUENCE_TIMEOUT';
  129. err.fatal = true;
  130. err.timeout = sequence._timeout;
  131. self._delegateError(err, sequence);
  132. })
  133. .on('start-tls', function() {
  134. Timers.active(sequence);
  135. self._connection._startTLS(function(err) {
  136. if (err) {
  137. // SSL negotiation error are fatal
  138. err.code = 'HANDSHAKE_SSL_ERROR';
  139. err.fatal = true;
  140. sequence.end(err);
  141. return
  142. }
  143. Timers.active(sequence);
  144. sequence._tlsUpgradeCompleteHandler();
  145. })
  146. });
  147. if (this._queue.length === 1) {
  148. this._parser.resetPacketNumber();
  149. this._startSequence(sequence);
  150. }
  151. return sequence;
  152. };
  153. Protocol.prototype._validateEnqueue = function(sequence) {
  154. var err;
  155. var prefix = 'Cannot enqueue ' + sequence.constructor.name;
  156. var prefixBefore = prefix + ' before ';
  157. var prefixAfter = prefix + ' after ';
  158. if (this._fatalError) {
  159. err = new Error(prefixAfter + 'fatal error.');
  160. err.code = 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR';
  161. } else if (this._quitSequence) {
  162. err = new Error(prefixAfter + 'invoking quit.');
  163. err.code = 'PROTOCOL_ENQUEUE_AFTER_QUIT';
  164. } else if (this._destroyed) {
  165. err = new Error(prefixAfter + 'being destroyed.');
  166. err.code = 'PROTOCOL_ENQUEUE_AFTER_DESTROY';
  167. } else if (this._handshakeSequence && sequence.constructor === Sequences.Handshake) {
  168. err = new Error(prefixAfter + 'already enqueuing a Handshake.');
  169. err.code = 'PROTOCOL_ENQUEUE_HANDSHAKE_TWICE';
  170. } else if (!this._handshakeSequence && sequence.constructor === Sequences.ChangeUser) {
  171. err = new Error(prefixBefore + 'a Handshake.');
  172. err.code = 'PROTOCOL_ENQUEUE_BEFORE_HANDSHAKE';
  173. } else {
  174. return true;
  175. }
  176. var self = this;
  177. err.fatal = false;
  178. sequence
  179. .on('error', function(err) {
  180. self._delegateError(err, sequence);
  181. })
  182. .end(err);
  183. return false;
  184. };
  185. Protocol.prototype._parsePacket = function() {
  186. var sequence = this._queue[0];
  187. if (!sequence) {
  188. var err = new Error('Received packet with no active sequence.');
  189. err.code = 'PROTOCOL_STRAY_PACKET';
  190. err.fatal = true;
  191. this._delegateError(err);
  192. return;
  193. }
  194. var Packet = this._determinePacket(sequence);
  195. var packet = new Packet({protocol41: this._config.protocol41});
  196. var packetName = Packet.name;
  197. // Special case: Faster dispatch, and parsing done inside sequence
  198. if (Packet === Packets.RowDataPacket) {
  199. sequence.RowDataPacket(packet, this._parser, this._connection);
  200. if (this._config.debug) {
  201. this._debugPacket(true, packet);
  202. }
  203. return;
  204. }
  205. if (this._config.debug) {
  206. this._parsePacketDebug(packet);
  207. } else {
  208. packet.parse(this._parser);
  209. }
  210. if (Packet === Packets.HandshakeInitializationPacket) {
  211. this._handshakeInitializationPacket = packet;
  212. }
  213. Timers.active(sequence);
  214. if (!sequence[packetName]) {
  215. var err = new Error('Received packet in the wrong sequence.');
  216. err.code = 'PROTOCOL_INCORRECT_PACKET_SEQUENCE';
  217. err.fatal = true;
  218. this._delegateError(err);
  219. return;
  220. }
  221. sequence[packetName](packet);
  222. };
  223. Protocol.prototype._parsePacketDebug = function _parsePacketDebug(packet) {
  224. try {
  225. packet.parse(this._parser);
  226. } finally {
  227. this._debugPacket(true, packet);
  228. }
  229. };
  230. Protocol.prototype._emitPacket = function(packet) {
  231. var packetWriter = new PacketWriter();
  232. packet.write(packetWriter);
  233. this.emit('data', packetWriter.toBuffer(this._parser));
  234. if (this._config.debug) {
  235. this._debugPacket(false, packet);
  236. }
  237. };
  238. Protocol.prototype._determinePacket = function(sequence) {
  239. var firstByte = this._parser.peak();
  240. if (sequence.determinePacket) {
  241. var Packet = sequence.determinePacket(firstByte, this._parser);
  242. if (Packet) {
  243. return Packet;
  244. }
  245. }
  246. switch (firstByte) {
  247. case 0x00:
  248. if (!this._handshaked) {
  249. this._handshaked = true;
  250. this.emit('handshake', this._handshakeInitializationPacket);
  251. }
  252. return Packets.OkPacket;
  253. case 0xfe: return Packets.EofPacket;
  254. case 0xff: return Packets.ErrorPacket;
  255. }
  256. throw new Error('Could not determine packet, firstByte = ' + firstByte);
  257. };
  258. Protocol.prototype._dequeue = function(sequence) {
  259. Timers.unenroll(sequence);
  260. // No point in advancing the queue, we are dead
  261. if (this._fatalError) {
  262. return;
  263. }
  264. this._queue.shift();
  265. var sequence = this._queue[0];
  266. if (!sequence) {
  267. this.emit('drain');
  268. return;
  269. }
  270. this._parser.resetPacketNumber();
  271. this._startSequence(sequence);
  272. };
  273. Protocol.prototype._startSequence = function(sequence) {
  274. if (sequence._timeout > 0 && isFinite(sequence._timeout)) {
  275. Timers.enroll(sequence, sequence._timeout);
  276. Timers.active(sequence);
  277. }
  278. if (sequence.constructor === Sequences.ChangeUser) {
  279. sequence.start(this._handshakeInitializationPacket);
  280. } else {
  281. sequence.start();
  282. }
  283. };
  284. Protocol.prototype.handleNetworkError = function(err) {
  285. err.fatal = true;
  286. var sequence = this._queue[0];
  287. if (sequence) {
  288. sequence.end(err);
  289. } else {
  290. this._delegateError(err);
  291. }
  292. };
  293. Protocol.prototype.handleParserError = function handleParserError(err) {
  294. var sequence = this._queue[0];
  295. if (sequence) {
  296. sequence.end(err);
  297. } else {
  298. this._delegateError(err);
  299. }
  300. };
  301. Protocol.prototype._delegateError = function(err, sequence) {
  302. // Stop delegating errors after the first fatal error
  303. if (this._fatalError) {
  304. return;
  305. }
  306. if (err.fatal) {
  307. this._fatalError = err;
  308. }
  309. if (this._shouldErrorBubbleUp(err, sequence)) {
  310. // Can't use regular 'error' event here as that always destroys the pipe
  311. // between socket and protocol which is not what we want (unless the
  312. // exception was fatal).
  313. this.emit('unhandledError', err);
  314. } else if (err.fatal) {
  315. // Send fatal error to all sequences in the queue
  316. var queue = this._queue;
  317. process.nextTick(function () {
  318. queue.forEach(function (sequence) {
  319. sequence.end(err);
  320. });
  321. queue.length = 0;
  322. });
  323. }
  324. // Make sure the stream we are piping to is getting closed
  325. if (err.fatal) {
  326. this.emit('end', err);
  327. }
  328. };
  329. Protocol.prototype._shouldErrorBubbleUp = function(err, sequence) {
  330. if (sequence) {
  331. if (sequence.hasErrorHandler()) {
  332. return false;
  333. } else if (!err.fatal) {
  334. return true;
  335. }
  336. }
  337. return (err.fatal && !this._hasPendingErrorHandlers());
  338. };
  339. Protocol.prototype._hasPendingErrorHandlers = function() {
  340. return this._queue.some(function(sequence) {
  341. return sequence.hasErrorHandler();
  342. });
  343. };
  344. Protocol.prototype.destroy = function() {
  345. this._destroyed = true;
  346. this._parser.pause();
  347. if (this._connection.state !== "disconnected") {
  348. if(!this._ended) {
  349. this.end();
  350. }
  351. }
  352. };
  353. Protocol.prototype._debugPacket = function(incoming, packet) {
  354. var headline = (incoming)
  355. ? '<-- '
  356. : '--> ';
  357. headline = headline + packet.constructor.name;
  358. // check for debug packet restriction
  359. if (Array.isArray(this._config.debug) && this._config.debug.indexOf(packet.constructor.name) === -1) {
  360. return;
  361. }
  362. console.log(headline);
  363. console.log(packet);
  364. console.log('');
  365. };