123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- var Command = require('./command');
- var util = require('util');
- var CommandCode = require('../constants/commands');
- var Packets = require('../packets');
- function BinlogDump(opts)
- {
- Command.call(this);
- //this.onResult = callback;
- this.opts = opts;
- }
- util.inherits(BinlogDump, Command);
- BinlogDump.prototype.start = function(packet, connection) {
- var packet = new Packets.BinlogDump(this.opts);
- connection.writePacket(packet.toPacket(1));
- return BinlogDump.prototype.binlogData;
- };
- function BinlogEventHeader(packet) {
- this.timestamp = packet.readInt32();
- this.eventType = packet.readInt8();
- this.serverId = packet.readInt32();
- this.eventSize = packet.readInt32();
- this.logPos = packet.readInt32();
- this.flags = packet.readInt16();
- }
- function RotateEvent(packet) {
- this.pposition = packet.readInt32();
- // TODO: read uint64 here
- var positionDword2 = packet.readInt32();
- this.nextBinlog = packet.readString();
- this.name = 'RotateEvent';
- }
- function FormatDescriptionEvent(packet) {
- this.binlogVersion = packet.readInt16();
- this.serverVersion = packet.readString(50).replace(/\u0000.*/, '');
- this.createTimestamp = packet.readInt32();
- this.eventHeaderLength = packet.readInt8(); // should be 19
- this.eventsLength = packet.readBuffer();
- this.name = 'FormatDescriptionEvent';
- }
- function QueryEvent(packet) {
-
- var parseStatusVars = require('../packets/binlog_query_statusvars.js');
- this.slaveProxyId = packet.readInt32();
- this.executionTime = packet.readInt32();
- var schemaLength = packet.readInt8();
- this.errorCode = packet.readInt16();
- var statusVarsLength = packet.readInt16();
- var statusVars = packet.readBuffer(statusVarsLength);
-
- this.schema = packet.readString(schemaLength);
- packet.readInt8(); // should be zero
- this.statusVars = parseStatusVars(statusVars);
-
- this.query = packet.readString();
- this.name = 'QueryEvent';
- }
- function XidEvent(packet) {
- this.binlogVersion = packet.readInt16();
- this.xid = packet.readInt64();
- this.name = 'XidEvent';
- }
- var eventParsers = [];
- eventParsers[2] = QueryEvent;
- eventParsers[4] = RotateEvent;
- eventParsers[15] = FormatDescriptionEvent;
- eventParsers[16] = XidEvent;
- BinlogDump.prototype.binlogData = function(packet) {
- // ok - continue consuming events
- // error - error
- // eof - end of binlog
- if (packet.isEOF()) {
- this.emit('eof');
- return null;
- }
-
- // binlog event header
- var ok = packet.readInt8();
- var header = new BinlogEventHeader(packet);
- var EventParser = eventParsers[header.eventType];
- var event;
- if (EventParser)
- event = new EventParser(packet);
- else {
- event = {
- name: 'UNKNOWN'
- }
- }
- event.header = header;
- this.emit('event', event);
- return BinlogDump.prototype.binlogData;
- };
- module.exports = BinlogDump;
|