binlog_dump.js 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. var Command = require('./command');
  2. var util = require('util');
  3. var CommandCode = require('../constants/commands');
  4. var Packets = require('../packets');
  5. function BinlogDump(opts)
  6. {
  7. Command.call(this);
  8. //this.onResult = callback;
  9. this.opts = opts;
  10. }
  11. util.inherits(BinlogDump, Command);
  12. BinlogDump.prototype.start = function(packet, connection) {
  13. var packet = new Packets.BinlogDump(this.opts);
  14. connection.writePacket(packet.toPacket(1));
  15. return BinlogDump.prototype.binlogData;
  16. };
  17. function BinlogEventHeader(packet) {
  18. this.timestamp = packet.readInt32();
  19. this.eventType = packet.readInt8();
  20. this.serverId = packet.readInt32();
  21. this.eventSize = packet.readInt32();
  22. this.logPos = packet.readInt32();
  23. this.flags = packet.readInt16();
  24. }
  25. function RotateEvent(packet) {
  26. this.pposition = packet.readInt32();
  27. // TODO: read uint64 here
  28. var positionDword2 = packet.readInt32();
  29. this.nextBinlog = packet.readString();
  30. this.name = 'RotateEvent';
  31. }
  32. function FormatDescriptionEvent(packet) {
  33. this.binlogVersion = packet.readInt16();
  34. this.serverVersion = packet.readString(50).replace(/\u0000.*/, '');
  35. this.createTimestamp = packet.readInt32();
  36. this.eventHeaderLength = packet.readInt8(); // should be 19
  37. this.eventsLength = packet.readBuffer();
  38. this.name = 'FormatDescriptionEvent';
  39. }
  40. function QueryEvent(packet) {
  41. var parseStatusVars = require('../packets/binlog_query_statusvars.js');
  42. this.slaveProxyId = packet.readInt32();
  43. this.executionTime = packet.readInt32();
  44. var schemaLength = packet.readInt8();
  45. this.errorCode = packet.readInt16();
  46. var statusVarsLength = packet.readInt16();
  47. var statusVars = packet.readBuffer(statusVarsLength);
  48. this.schema = packet.readString(schemaLength);
  49. packet.readInt8(); // should be zero
  50. this.statusVars = parseStatusVars(statusVars);
  51. this.query = packet.readString();
  52. this.name = 'QueryEvent';
  53. }
  54. function XidEvent(packet) {
  55. this.binlogVersion = packet.readInt16();
  56. this.xid = packet.readInt64();
  57. this.name = 'XidEvent';
  58. }
  59. var eventParsers = [];
  60. eventParsers[2] = QueryEvent;
  61. eventParsers[4] = RotateEvent;
  62. eventParsers[15] = FormatDescriptionEvent;
  63. eventParsers[16] = XidEvent;
  64. BinlogDump.prototype.binlogData = function(packet) {
  65. // ok - continue consuming events
  66. // error - error
  67. // eof - end of binlog
  68. if (packet.isEOF()) {
  69. this.emit('eof');
  70. return null;
  71. }
  72. // binlog event header
  73. var ok = packet.readInt8();
  74. var header = new BinlogEventHeader(packet);
  75. var EventParser = eventParsers[header.eventType];
  76. var event;
  77. if (EventParser)
  78. event = new EventParser(packet);
  79. else {
  80. event = {
  81. name: 'UNKNOWN'
  82. }
  83. }
  84. event.header = header;
  85. this.emit('event', event);
  86. return BinlogDump.prototype.binlogData;
  87. };
  88. module.exports = BinlogDump;