query.js 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. var fs = require('fs');
  2. var util = require('util');
  3. var Readable = require('readable-stream');
  4. var Command = require('./command.js');
  5. var Packets = require('../packets/index.js');
  6. var compileParser = require('../compile_text_parser.js');
  7. var ServerStatus = require('../constants/server_status.js');
  8. var EmptyPacket = new Packets.Packet(0, new Buffer(4), 0, 4);
  9. function Query(sql, options, callback)
  10. {
  11. Command.call(this);
  12. this.query = sql;
  13. // node-mysql compatibility: query.sql as alias to query.query #121
  14. this.sql = this.query;
  15. this.options = options;
  16. this.onResult = callback;
  17. this._fieldCount = 0;
  18. this._rowParser = null;
  19. this._fields = [];
  20. this._rows = [];
  21. this._receivedFieldsCount = 0;
  22. this._resultIndex = 0;
  23. this._localStream = null;
  24. this._streamFactory = options.infileStreamFactory;
  25. this._connection = null;
  26. }
  27. util.inherits(Query, Command);
  28. Query.prototype.start = function(packet, connection) {
  29. if (connection.config.debug) {
  30. console.log(' Sending query command: %s', this.query);
  31. }
  32. this._connection = connection;
  33. var cmdPacket = new Packets.Query(this.query);
  34. connection.writePacket(cmdPacket.toPacket(1));
  35. return Query.prototype.resultsetHeader;
  36. };
  37. Query.prototype.done = function() {
  38. var self = this;
  39. if (this.onResult) {
  40. var rows, fields;
  41. if (this._resultIndex === 0) {
  42. rows = this._rows[0];
  43. fields = this._fields[0];
  44. } else {
  45. rows = this._rows;
  46. fields = this._fields;
  47. }
  48. if (fields) {
  49. process.nextTick(function() {
  50. self.onResult(null, rows, fields, self._resultIndex + 1);
  51. });
  52. } else {
  53. process.nextTick(function() {
  54. self.onResult(null, rows, void(0), self._resultIndex + 1);
  55. });
  56. }
  57. }
  58. return null;
  59. };
  60. Query.prototype.doneInsert = function(rs) {
  61. if (this._localStreamError) {
  62. if (this.onResult) {
  63. this.onResult(this._localStreamError, rs);
  64. } else {
  65. this.emit('error', this._localStreamError);
  66. }
  67. return null;
  68. }
  69. this._rows.push(rs);
  70. this._fields.push(void(0));
  71. this.emit('result', rs, this._resultIndex);
  72. this.emit('fields', void(0), this._resultIndex);
  73. if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  74. this._resultIndex++;
  75. return this.resultsetHeader;
  76. }
  77. return this.done();
  78. };
  79. Query.prototype.resultsetHeader = function(packet, connection) {
  80. var rs = new Packets.ResultSetHeader(packet, connection.config.bigNumberStrings);
  81. this._fieldCount = rs.fieldCount;
  82. if (connection.config.debug) {
  83. console.log(' Resultset header received, expecting ' + rs.fieldCount + ' column definition packets');
  84. }
  85. if (this._fieldCount === 0) {
  86. return this.doneInsert(rs);
  87. }
  88. if (this._fieldCount === null) {
  89. this._localStream = this._findOrCreateReadStream(rs.infileName);
  90. // start streaming, after last packet expect OK
  91. this._streamLocalInfile(connection);
  92. return this.infileOk;
  93. }
  94. this._receivedFieldsCount = 0;
  95. this._rows.push([]);
  96. this._fields.push([]);
  97. return this.readField;
  98. };
  99. // some code taken from https://github.com/felixge/node-mysql/pull/668
  100. Query.prototype._findOrCreateReadStream = function(path) {
  101. if (this._streamFactory)
  102. return this._streamFactory(path);
  103. return fs.createReadStream(path, {
  104. 'flag': 'r',
  105. 'encoding': null,
  106. 'autoClose': true
  107. });
  108. };
  109. Query.prototype._streamLocalInfile = function(connection) {
  110. var command = this;
  111. connection.stream.on('pause', function() {
  112. command._localStream.pause();
  113. });
  114. connection.stream.on('drain', function() {
  115. command._localStream.resume();
  116. });
  117. this._localStream.on('data', function (data) {
  118. var dataWithHeader = new Buffer(data.length + 4);
  119. data.copy(dataWithHeader, 4);
  120. connection.writePacket(new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length));
  121. });
  122. this._localStream.on('end', function (data) {
  123. connection.writePacket(EmptyPacket);
  124. });
  125. this._localStream.on('error', function(err) {
  126. command._localStreamError = err;
  127. command._localStream.emit('end');
  128. });
  129. }
  130. Query.prototype.readField = function(packet, connection) {
  131. this._receivedFieldsCount++;
  132. // Often there is much more data in the column definition than in the row itself
  133. // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
  134. // you can 'cache' result of parsing. Field packets still received, but ignored in that case
  135. // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
  136. if (this._fields[this._resultIndex].length != this._fieldCount) {
  137. var field = new Packets.ColumnDefinition(packet);
  138. this._fields[this._resultIndex].push(field);
  139. if (connection.config.debug) {
  140. console.log(' Column definition:');
  141. console.log(' name: ' + field.name);
  142. console.log(' type: ' + field.columnType);
  143. console.log(' flags: ' + field.flags);
  144. }
  145. }
  146. // last field received
  147. if (this._receivedFieldsCount == this._fieldCount) {
  148. var fields = this._fields[this._resultIndex];
  149. this.emit('fields', fields, this._resultIndex);
  150. var parserKey = connection.keyFromFields(fields, this.options);
  151. this._rowParser = connection.textProtocolParsers[parserKey];
  152. if (!this._rowParser) {
  153. this._rowParser = compileParser(fields, this.options, connection.config);
  154. connection.textProtocolParsers[parserKey] = this.rowParser;
  155. }
  156. return Query.prototype.fieldsEOF;
  157. }
  158. return Query.prototype.readField;
  159. };
  160. Query.prototype.fieldsEOF = function(packet, connection) {
  161. // check EOF
  162. if (!packet.isEOF())
  163. return connection.protocolError("Expected EOF packet");
  164. return this.row;
  165. };
  166. Query.prototype.row = function(packet)
  167. {
  168. if (packet.isEOF()) {
  169. var status = packet.eofStatusFlags();
  170. var moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
  171. if (moreResults) {
  172. this._resultIndex++;
  173. return Query.prototype.resultsetHeader;
  174. }
  175. return this.done();
  176. }
  177. var row = new this._rowParser(packet);
  178. if (this.onResult)
  179. this._rows[this._resultIndex].push(row);
  180. else
  181. this.emit('result', row, this._resultIndex);
  182. return Query.prototype.row;
  183. };
  184. Query.prototype.infileOk = function(packet, connection) {
  185. var rs = new Packets.ResultSetHeader(packet, connection.config.bigNumberStrings);
  186. return this.doneInsert(rs);
  187. };
  188. Query.prototype.stream = function(options) {
  189. var self = this,
  190. stream;
  191. options = options || {};
  192. options.objectMode = true;
  193. stream = new Readable(options);
  194. stream._read = function() {
  195. self._connection && self._connection.resume();
  196. };
  197. this.on('result',function(row,i) {
  198. if (!stream.push(row)) self._connection.pause();
  199. stream.emit('result',row,i); // replicate old emitter
  200. });
  201. this.on('error',function(err) {
  202. stream.emit('error',err); // Pass on any errors
  203. });
  204. this.on('end', function() {
  205. stream.emit('close'); // notify readers that query has completed
  206. stream.push(null); // pushing null, indicating EOF
  207. });
  208. this.on('fields',function(fields,i) {
  209. stream.emit('fields',fields,i); // replicate old emitter
  210. });
  211. return stream;
  212. };
  213. module.exports = Query;