提交 bce77b4f 编写于 作者: A avalon566

parse necessary binlog event

上级 1998cc16
......@@ -20,6 +20,7 @@ package info.avalon566.shardingscaling.sync.mysql.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.MySQLBinlogEventPacketDecoder;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.MySQLCommandPacketDecoder;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.MySQLLengthFieldBasedFrameEncoder;
import info.avalon566.shardingscaling.sync.mysql.binlog.event.AbstractBinlogEvent;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.command.BinlogDumpCommandPacket;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.command.QueryCommandPacket;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.command.RegisterSlaveCommandPacket;
......@@ -44,6 +45,7 @@ import lombok.var;
import java.net.InetSocketAddress;
import java.nio.ByteOrder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
/**
......@@ -71,6 +73,8 @@ public final class MySQLConnector {
private Promise<Object> responseCallback;
private ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue = new ArrayBlockingQueue(1000);
class MySQLCommandResponHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
......@@ -88,6 +92,20 @@ public final class MySQLConnector {
}
}
class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof AbstractBinlogEvent) {
blockingEventQueue.put((AbstractBinlogEvent) msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("protocol resolution error", cause);
}
}
public MySQLConnector(final int serverId, final String host, final int port, final String username, final String password) {
this.serverId = serverId;
this.host = host;
......@@ -162,12 +180,12 @@ public final class MySQLConnector {
}
/**
* Dump binlog.
* Start dump binlog.
*
* @param binlogFileName binlog file name
* @param binlogPosition binlog position
*/
public synchronized void dump(final String binlogFileName, final long binlogPosition) {
public synchronized void subscribe(final String binlogFileName, final long binlogPosition) {
initDumpConnectSession();
registerSlave();
responseCallback = null;
......@@ -177,16 +195,18 @@ public final class MySQLConnector {
binlogDumpCmd.setSlaveServerId(serverId);
channel.pipeline().remove(MySQLCommandPacketDecoder.class);
channel.pipeline().remove(MySQLCommandResponHandler.class);
channel.pipeline().addAfter(
MySQLLengthFieldBasedFrameEncoder.class.getSimpleName(),
MySQLBinlogEventPacketDecoder.class.getSimpleName(),
new MySQLBinlogEventPacketDecoder());
channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder());
channel.pipeline().addLast(new MySQLBinlogEventHandler());
channel.writeAndFlush(binlogDumpCmd);
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Poll binlog event.
*
* @return binlog event
*/
public synchronized AbstractBinlogEvent poll() {
return blockingEventQueue.poll();
}
private void initDumpConnectSession() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog.ColumnDef;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog.TableMapEvent;
import lombok.Data;
import lombok.var;
import java.util.HashMap;
/**
* @author avalon566
*/
@Data
public class BinlogContext {
private String fileName;
private HashMap<Long, TableMapEvent> tableMap = new HashMap<>();
public void putTableMapEvent(final long tableId, final TableMapEvent tableMapEvent) {
tableMap.put(tableId, tableMapEvent);
}
public String getFullTableName(final long tableId) {
var tableMapEvent = tableMap.get(tableId);
return String.format("%s.%s", tableMapEvent.getSchemaName(), tableMapEvent.getTableName());
}
public ColumnDef[] getColumnDefs(final long tableId) {
return tableMap.get(tableId).getColumnDefs();
}
}
......@@ -121,6 +121,16 @@ public final class DataTypesCodec {
return in.readUnsignedByte();
}
/**
* Read unsigned big endian byte order 2 byte integer from {@code ByteBuf}.
*
* @param in byte buffer
* @return int value
*/
public static int readUnsignedInt2BE(final ByteBuf in) {
return in.readUnsignedShort();
}
/**
* Read unsigned little endian byte order 2 byte integer from {@code ByteBuf}.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import io.netty.buffer.ByteBuf;
import lombok.var;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* @author avalon566
*/
public class DecimalValueDecoder {
private static final int DIG_PER_DEC = 9;
private static final int[] DIG_TO_BYTES = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
public static Serializable decodeNewDecimal(final int meta, final ByteBuf in) {
var precision = meta >> 8;
var scale = meta & 0xFF;
var x = precision - scale;
var ipd = x / DIG_PER_DEC;
var fpd = scale / DIG_PER_DEC;
var decimalLength = (ipd << 2) + DIG_TO_BYTES[x - ipd * DIG_PER_DEC] + (fpd << 2) + DIG_TO_BYTES[scale - fpd * DIG_PER_DEC];
return toDecimal(precision, scale, DataTypesCodec.readBytes(decimalLength, in));
}
private static BigDecimal toDecimal(final int precision, final int scale, final byte[] value) {
var positive = (value[0] & 0x80) == 0x80;
value[0] ^= 0x80;
if (!positive) {
for (int i = 0; i < value.length; i++) {
value[i] ^= 0xFF;
}
}
var x = precision - scale;
var ipDigits = x / DIG_PER_DEC;
var ipDigitsX = x - ipDigits * DIG_PER_DEC;
var ipSize = (ipDigits << 2) + DIG_TO_BYTES[ipDigitsX];
var offset = DIG_TO_BYTES[ipDigitsX];
var ip = offset > 0 ? BigDecimal.valueOf(readFixedLengthIntBE(value, 0, offset)) : BigDecimal.ZERO;
for (; offset < ipSize; offset += 4) {
int i = readFixedLengthIntBE(value, offset, 4);
ip = ip.movePointRight(DIG_PER_DEC).add(BigDecimal.valueOf(i));
}
var shift = 0;
var fp = BigDecimal.ZERO;
for (; shift + DIG_PER_DEC <= scale; shift += DIG_PER_DEC, offset += 4) {
var i = readFixedLengthIntBE(value, offset, 4);
fp = fp.add(BigDecimal.valueOf(i).movePointLeft(shift + DIG_PER_DEC));
}
if (shift < scale) {
var i = readFixedLengthIntBE(value, offset, DIG_TO_BYTES[scale - shift]);
fp = fp.add(BigDecimal.valueOf(i).movePointLeft(scale));
}
var result = ip.add(fp);
return positive ? result : result.negate();
}
private static int readFixedLengthIntBE(final byte[] bytes, final int offset, final int length) {
var result = 0;
for (var i = offset; i < (offset + length); i++) {
result = (result << 8) | (short) (0xff & bytes[i]);
}
return result;
}
}
......@@ -17,11 +17,15 @@
package info.avalon566.shardingscaling.sync.mysql.binlog.codec;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog.BinlogEventHeader;
import lombok.extern.slf4j.Slf4j;
import info.avalon566.shardingscaling.sync.mysql.binlog.event.DeleteRowsEvent;
import info.avalon566.shardingscaling.sync.mysql.binlog.event.UpdateRowsEvent;
import info.avalon566.shardingscaling.sync.mysql.binlog.event.WriteRowsEvent;
import info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import java.util.List;
......@@ -33,14 +37,97 @@ import java.util.List;
*/
@Slf4j
public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
private final BinlogContext binlogContext = new BinlogContext();
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
in.readByte();
BinlogEventHeader eventHeader = new BinlogEventHeader();
eventHeader.fromBytes(in);
log.info(Byte.toString(eventHeader.getTypeCode()));
log.info("readable:{},length:{}", in.readableBytes(), eventHeader.getEventLength() - 19);
in.readBytes(eventHeader.getEventLength() - 19);
var binlogEventHeader = new BinlogEventHeader();
binlogEventHeader.fromBytes(in);
handleChecksum(in);
switch (binlogEventHeader.getTypeCode()) {
case EventTypes.ROTATE_EVENT:
decodeRotateEvent(in);
break;
case EventTypes.FORMAT_DESCRIPTION_EVENT:
decodeFormatDescriptionEvent(in);
break;
case EventTypes.TABLE_MAP_EVENT:
decodeTableMapEvent(in);
break;
case EventTypes.WRITE_ROWS_EVENTv2:
WriteRowsEvent writeRowsEvent = decodeWriteRowsEventV2(binlogEventHeader, in);
out.add(writeRowsEvent);
break;
case EventTypes.UPDATE_ROWS_EVENTv2:
UpdateRowsEvent updateRowsEvent = decodeUpdateRowsEventV2(binlogEventHeader, in);
out.add(updateRowsEvent);
break;
case EventTypes.DELETE_ROWS_EVENTv2:
DeleteRowsEvent deleteRowsEvent = decodeDeleteRowsEventV2(binlogEventHeader, in);
out.add(deleteRowsEvent);
break;
default:
in.readBytes(in.readableBytes());
}
if (in.isReadable()) {
throw new UnsupportedOperationException();
}
}
private void handleChecksum(final ByteBuf in) {
//TODO: detect checksum length
var checksumLength = 4;
in.writerIndex(in.writerIndex() - 4);
}
private void decodeRotateEvent(final ByteBuf in) {
var rotateEvent = new RotateEvent();
rotateEvent.parse(in);
binlogContext.setFileName(rotateEvent.getNextFileName());
}
private DeleteRowsEvent decodeDeleteRowsEventV2(final BinlogEventHeader binlogEventHeader, final ByteBuf in) {
var rowsEvent = new RowsEvent(binlogEventHeader);
rowsEvent.parsePostHeader(in);
rowsEvent.parsePaylod(binlogContext, in);
var deleteRowsEvent = new DeleteRowsEvent();
deleteRowsEvent.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
deleteRowsEvent.setBeforeColumns(rowsEvent.getColumnValues1());
return deleteRowsEvent;
}
private UpdateRowsEvent decodeUpdateRowsEventV2(final BinlogEventHeader binlogEventHeader, final ByteBuf in) {
var rowsEvent = new RowsEvent(binlogEventHeader);
rowsEvent.parsePostHeader(in);
rowsEvent.parsePaylod(binlogContext, in);
var updateRowsEvent = new UpdateRowsEvent();
updateRowsEvent.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
updateRowsEvent.setBeforeColumns(rowsEvent.getColumnValues1());
updateRowsEvent.setAfterColumns(rowsEvent.getColumnValues2());
return updateRowsEvent;
}
private WriteRowsEvent decodeWriteRowsEventV2(final BinlogEventHeader binlogEventHeader, final ByteBuf in) {
var rowsEvent = new RowsEvent(binlogEventHeader);
rowsEvent.parsePostHeader(in);
rowsEvent.parsePaylod(binlogContext, in);
var writeRowsEvent = new WriteRowsEvent();
writeRowsEvent.setTableName(binlogContext.getFullTableName(rowsEvent.getTableId()));
writeRowsEvent.setAfterColumns(rowsEvent.getColumnValues1());
return writeRowsEvent;
}
private void decodeTableMapEvent(final ByteBuf in) {
var tableMapLogEvent = new TableMapEvent();
tableMapLogEvent.parsePostHeader(in);
tableMapLogEvent.parsePayload(in);
binlogContext.putTableMapEvent(tableMapLogEvent.getTableId(), tableMapLogEvent);
}
private void decodeFormatDescriptionEvent(final ByteBuf in) {
var formatDescriptionEvent = new FormatDescriptionEvent();
formatDescriptionEvent.parse(in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.event;
import lombok.Data;
/**
* @author avalon566
*/
@Data
public abstract class AbstractBinlogEvent {
private String fileName;
private long position;
private EventTypes eventType;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.event;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author avalon566
*/
@Data
public class DeleteRowsEvent extends AbstractBinlogEvent {
private String tableName;
private List<Serializable[]> beforeColumns;
public DeleteRowsEvent() {
setEventType(EventTypes.DELETE_ROWS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.event;
/**
* @author avalon566
*/
public enum EventTypes {
WRITE_ROWS,
DELETE_ROWS,
UPDATE_ROWS
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.event;
import lombok.Data;
import java.io.Serializable;
import java.util.BitSet;
import java.util.List;
/**
* @author avalon566
*/
@Data
public class UpdateRowsEvent extends AbstractBinlogEvent {
private String tableName;
private List<Serializable[]> beforeColumns;
private List<Serializable[]> afterColumns;
private BitSet changedBitmap;
public UpdateRowsEvent() {
setEventType(EventTypes.UPDATE_ROWS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.event;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author avalon566
*/
@Data
public class WriteRowsEvent extends AbstractBinlogEvent {
private String tableName;
private List<Serializable[]> afterColumns;
public WriteRowsEvent() {
setEventType(EventTypes.WRITE_ROWS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import lombok.Data;
@Data
public class ColumnDef {
private int type;
private int meta;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
/**
* @author avalon566
*/
public final class ColumnTypes {
public static final int MYSQL_TYPE_DECIMAL = 0;
public static final int MYSQL_TYPE_TINY = 1;
public static final int MYSQL_TYPE_SHORT = 2;
public static final int MYSQL_TYPE_LONG = 3;
public static final int MYSQL_TYPE_FLOAT = 4;
public static final int MYSQL_TYPE_DOUBLE = 5;
public static final int MYSQL_TYPE_NULL = 6;
public static final int MYSQL_TYPE_TIMESTAMP = 7;
public static final int MYSQL_TYPE_LONGLONG = 8;
public static final int MYSQL_TYPE_INT24 = 9;
public static final int MYSQL_TYPE_DATE = 10;
public static final int MYSQL_TYPE_TIME = 11;
public static final int MYSQL_TYPE_DATETIME = 12;
public static final int MYSQL_TYPE_YEAR = 13;
public static final int MYSQL_TYPE_NEWDATE = 14;
public static final int MYSQL_TYPE_VARCHAR = 15;
public static final int MYSQL_TYPE_BIT = 16;
public static final int MYSQL_TYPE_TIMESTAMP2 = 17;
public static final int MYSQL_TYPE_DATETIME2 = 18;
public static final int MYSQL_TYPE_TIME2 = 19;
public static final int MYSQL_TYPE_JSON = 245;
public static final int MYSQL_TYPE_NEWDECIMAL = 246;
public static final int MYSQL_TYPE_ENUM = 247;
public static final int MYSQL_TYPE_SET = 248;
public static final int MYSQL_TYPE_TINY_BLOB = 249;
public static final int MYSQL_TYPE_MEDIUM_BLOB = 250;
public static final int MYSQL_TYPE_LONG_BLOB = 251;
public static final int MYSQL_TYPE_BLOB = 252;
public static final int MYSQL_TYPE_VAR_STRING = 253;
public static final int MYSQL_TYPE_STRING = 254;
public static final int MYSQL_TYPE_GEOMETRY = 255;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
/**
* https://dev.mysql.com/doc/shorternals/en/binlog-event-type.html
*
* @author avalon566
*/
public final class EventTypes {
public static final short UNKNOWN_EVENT = 0;
public static final short START_EVENT_V3 = 1;
public static final short QUERY_EVENT = 2;
public static final short STOP_EVENT = 3;
public static final short ROTATE_EVENT = 4;
public static final short INTVAR_EVENT = 5;
public static final short LOAD_EVENT = 6;
public static final short SLAVE_EVENT = 7;
public static final short CREATE_FILE_EVENT = 8;
public static final short APPEND_BLOCK_EVENT = 9;
public static final short EXEC_LOAD_EVENT = 10;
public static final short DELETE_FILE_EVENT = 11;
public static final short NEW_LOAD_EVENT = 12;
public static final short RAND_EVENT = 13;
public static final short USER_VAR_EVENT = 14;
public static final short FORMAT_DESCRIPTION_EVENT = 15;
public static final short XID_EVENT = 16;
public static final short BEGIN_LOAD_QUERY_EVENT = 17;
public static final short EXECUTE_LOAD_QUERY_EVENT = 18;
public static final short TABLE_MAP_EVENT = 19;
public static final short WRITE_ROWS_EVENTv0 = 20;
public static final short UPDATE_ROWS_EVENTv0 = 21;
public static final short DELETE_ROWS_EVENTv0 = 22;
public static final short WRITE_ROWS_EVENTv1 = 23;
public static final short UPDATE_ROWS_EVENTv1 = 24;
public static final short DELETE_ROWS_EVENTv1 = 25;
public static final short INCIDENT_EVENT = 26;
public static final short HEARTBEAT_EVENT = 27;
public static final short IGNORABLE_EVENT = 28;
public static final short ROWS_QUERY_EVENT = 29;
public static final short WRITE_ROWS_EVENTv2 = 30;
public static final short UPDATE_ROWS_EVENTv2 = 31;
public static final short DELETE_ROWS_EVENTv2 = 32;
public static final short GTID_EVENT = 33;
public static final short ANONYMOUS_GTID_EVENT = 34;
public static final short PREVIOUS_GTIDS_EVENT = 35;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec;
import io.netty.buffer.ByteBuf;
import lombok.Data;
/**
* https://dev.mysql.com/doc/internals/en/format-description-event.html
*
* @author avalon566
*/
@Data
public class FormatDescriptionEvent {
private int binglogVersion;
private String mysqlServerVersion;
private long createTimestamp;
private short eventHeaderLength;
private short checksumType = 1;
public void parse(final ByteBuf in) {
binglogVersion = DataTypesCodec.readUnsignedInt2LE(in);
mysqlServerVersion = DataTypesCodec.readFixedLengthString(50, in);
createTimestamp = DataTypesCodec.readUnsignedInt4LE(in);
eventHeaderLength = DataTypesCodec.readUnsignedInt1(in);
// skip remain data
in.readBytes(in.readableBytes());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec;
import io.netty.buffer.ByteBuf;
import lombok.Data;
/**
* @author avalon566
*/
@Data
public class RotateEvent {
private long position;
private String nextFileName;
public void parse(final ByteBuf in) {
position = DataTypesCodec.readInt8LE(in);
nextFileName = DataTypesCodec.readFixedLengthString(in.readableBytes(), in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.BinlogContext;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DecimalValueDecoder;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.var;
import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
/**
* Rows event.
* https://dev.mysql.com/doc/internals/en/rows-event.html
*
* @author avalon566
*/
@Data
public class RowsEvent {
private final BinlogEventHeader binlogEventHeader;
private long tableId;
private int flags;
private BitSet columnsPresentBitmap;
private BitSet columnsPresentBitmap2;
private List<Serializable[]> columnValues1 = new ArrayList<>();
private List<Serializable[]> columnValues2 = new ArrayList<>();
public RowsEvent(final BinlogEventHeader binlogEventHeader) {
this.binlogEventHeader = binlogEventHeader;
}
public void parsePostHeader(final ByteBuf in) {
tableId = DataTypesCodec.readUnsignedInt6LE(in);
flags = DataTypesCodec.readUnsignedInt2LE(in);
var extraDataLength = DataTypesCodec.readUnsignedInt2LE(in) - 2;
// skip data
DataTypesCodec.readBytes(extraDataLength, in);
}
public void parsePaylod(final BinlogContext binlogContext, final ByteBuf in) {
var columnsLength = (int) DataTypesCodec.readLengthCodedIntLE(in);
columnsPresentBitmap = DataTypesCodec.readBitmap(columnsLength, in);
if (EventTypes.UPDATE_ROWS_EVENTv2 == binlogEventHeader.getTypeCode()) {
columnsPresentBitmap2 = DataTypesCodec.readBitmap(columnsLength, in);
}
var columnDefs = binlogContext.getColumnDefs(tableId);
while (in.isReadable()) {
//TODO: support minimal binlog row image
var nullBitmap = DataTypesCodec.readBitmap(columnsLength, in);
var columnValues = new Serializable[columnsLength];
for (int i = 0; i < columnsLength; i++) {
if (!nullBitmap.get(i)) {
columnValues[i] = decodeValue(columnDefs[i], in);
} else {
columnValues[i] = null;
}
}
columnValues1.add(columnValues);
if (EventTypes.UPDATE_ROWS_EVENTv2 == binlogEventHeader.getTypeCode()) {
nullBitmap = DataTypesCodec.readBitmap(columnsLength, in);
columnValues = new Serializable[columnsLength];
for (int i = 0; i < columnsLength; i++) {
if (!nullBitmap.get(i)) {
columnValues[i] = decodeValue(columnDefs[i], in);
} else {
columnValues[i] = null;
}
}
columnValues2.add(columnValues);
}
}
}
private Serializable decodeValue(final ColumnDef columnDef, final ByteBuf in) {
switch (columnDef.getType()) {
case ColumnTypes.MYSQL_TYPE_LONG:
return decodeLong(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_TINY:
return docodeTiny(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_SHORT:
return decodeShort(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_INT24:
return decodeInt24(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_LONGLONG:
return decodeLonglong(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_FLOAT:
return decodeFloat(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_NEWDECIMAL:
return decodeNewdecimal(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_DOUBLE:
return DataTypesCodec.readDoubleLE(in);
case ColumnTypes.MYSQL_TYPE_TIMESTAMP2:
return decodeTimestamp2(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_DATETIME2:
return decodeDatetime2(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_TIME2:
return decodeTime2(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_DATE:
return decodeDate(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_YEAR:
return decodeYear(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_BLOB:
return decodeBlob(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_VARCHAR:
case ColumnTypes.MYSQL_TYPE_VAR_STRING:
return decodeVarString(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_STRING:
return decodeString(columnDef.getMeta(), in);
case ColumnTypes.MYSQL_TYPE_JSON:
return decodeJson(columnDef.getMeta(), in);
default:
throw new UnsupportedOperationException();
}
}
private Serializable decodeLong(final int meta, final ByteBuf in) {
return DataTypesCodec.readUnsignedInt4LE(in);
}
private Serializable docodeTiny(final int meta, final ByteBuf in) {
return DataTypesCodec.readUnsignedInt1(in);
}
private Serializable decodeShort(final int meta, final ByteBuf in) {
return DataTypesCodec.readUnsignedInt2LE(in);
}
private Serializable decodeInt24(final int meta, final ByteBuf in) {
return DataTypesCodec.readUnsignedInt3LE(in);
}
private Serializable decodeLonglong(final int meta, final ByteBuf in) {
return DataTypesCodec.readInt8LE(in);
}
private Serializable decodeFloat(final int meta, final ByteBuf in) {
return DataTypesCodec.readFloatLE(in);
}
private Serializable decodeTimestamp2(final int meta, final ByteBuf in) {
var second = DataTypesCodec.readUnsignedInt4BE(in);
var secondStr = new Timestamp(second * 1000).toString();
// remove millsecond data
secondStr = secondStr.substring(0, secondStr.length() - 2);
if (0 < meta) {
secondStr += "." + readAndAlignMillisecond(meta, in);
}
return secondStr;
}
private Serializable decodeDatetime2(final int meta, final ByteBuf in) {
var datetime = DataTypesCodec.readUnsignedInt5BE(in) - 0x8000000000L;
long ymd = datetime >> 17;
long ym = ymd >> 5;
long hms = datetime % (1 << 17);
return String.format("%d-%02d-%02d %02d:%02d:%02d%s",
ym / 13,
ym % 13,
ymd % (1 << 5),
hms >> 12,
(hms >> 6) % (1 << 6),
hms % (1 << 6),
0 < meta ? "." + readAndAlignMillisecond(meta, in) : "");
}
private Serializable decodeTime2(final int meta, final ByteBuf in) {
var time = DataTypesCodec.readUnsignedInt3BE(in) - 0x800000L;
return String.format("%02d:%02d:%02d",
(time >> 12) % (1 << 10),
(time >> 6) % (1 << 6),
time % (1 << 6));
}
private Serializable decodeDate(final int meta, final ByteBuf in) {
var date = DataTypesCodec.readUnsignedInt3LE(in);
return String.format("%d-%02d-%02d",
date / 16 / 32,
date / 32 % 16,
date % 32);
}
private Serializable decodeYear(final int meta, final ByteBuf in) {
return DataTypesCodec.readUnsignedInt1(in) + 1900;
}
private Serializable decodeNewdecimal(final int meta, final ByteBuf in) {
return DecimalValueDecoder.decodeNewDecimal(meta, in);
}
private Serializable decodeEnumVale(final int meta, final ByteBuf in) {
switch (meta) {
case 1:
return DataTypesCodec.readUnsignedInt1(in);
case 2:
return DataTypesCodec.readUnsignedInt2LE(in);
default:
throw new UnsupportedOperationException();
}
}
private Serializable decodeBlob(final int meta, final ByteBuf in) {
switch (meta) {
case 1:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt1(in), in);
case 2:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt2LE(in), in);
case 3:
return DataTypesCodec.readBytes(DataTypesCodec.readUnsignedInt3LE(in), in);
case 4:
return DataTypesCodec.readBytes((int) DataTypesCodec.readUnsignedInt4LE(in), in);
default:
throw new UnsupportedOperationException();
}
}
private Serializable decodeVarString(final int meta, final ByteBuf in) {
var length = 0;
if (256 > meta) {
length = DataTypesCodec.readUnsignedInt1(in);
} else {
length = DataTypesCodec.readUnsignedInt2LE(in);
}
return new String(DataTypesCodec.readBytes(length, in));
}
private Serializable decodeString(final int meta, final ByteBuf in) {
switch (meta >> 8) {
case ColumnTypes.MYSQL_TYPE_ENUM:
return decodeEnumVale(meta & 0xff, in);
case ColumnTypes.MYSQL_TYPE_SET:
// hardcode
return in.readByte();
case 254:
var length = DataTypesCodec.readUnsignedInt1(in);
return new String(DataTypesCodec.readBytes(length, in));
default:
throw new UnsupportedOperationException();
}
}
private Serializable decodeJson(final int meta, final ByteBuf in) {
//TODO: decode json data to string
var length = 0;
switch (meta) {
case 1:
length = DataTypesCodec.readUnsignedInt1(in);
break;
case 2:
length = DataTypesCodec.readUnsignedInt2LE(in);
break;
case 3:
length = DataTypesCodec.readUnsignedInt3LE(in);
break;
case 4:
length = (int) DataTypesCodec.readUnsignedInt4LE(in);
break;
default:
throw new UnsupportedOperationException();
}
if (0 == length) {
return "";
} else {
return DataTypesCodec.readBytes(length, in);
}
}
private String readAndAlignMillisecond(final int meta, final ByteBuf in) {
var fraction = 0;
switch (meta) {
case 1:
case 2:
fraction = DataTypesCodec.readUnsignedInt1(in) * 10000;
break;
case 3:
case 4:
fraction = DataTypesCodec.readUnsignedInt2BE(in) * 100;
break;
case 5:
case 6:
fraction = DataTypesCodec.readUnsignedInt3BE(in);
break;
default:
throw new UnsupportedOperationException();
}
return alignMillisecond(meta, fraction);
}
private String alignMillisecond(final int meta, final int fraction) {
var result = new StringBuilder(6);
var str = Integer.toString(fraction);
var append = 6 - str.length();
for (int i = 0; i < append; i++) {
result.append("0");
}
result.append(str);
return result.substring(0, meta);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog;
import info.avalon566.shardingscaling.sync.mysql.binlog.codec.DataTypesCodec;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.var;
/**
* Table map event.
* https://dev.mysql.com/doc/internals/en/table-map-event.html
* @author avalon566
*/
@Data
public class TableMapEvent {
private long tableId;
private int flags;
private String schemaName;
private String tableName;
private ColumnDef[] columnDefs;
/**
* Parse post header.
*
* @param in byte buff
*/
public void parsePostHeader(final ByteBuf in) {
tableId = DataTypesCodec.readUnsignedInt6LE(in);
flags = DataTypesCodec.readUnsignedInt2LE(in);
}
/**
* Parse payload.
*
* @param in byte buffer
*/
public void parsePayload(final ByteBuf in) {
schemaName = DataTypesCodec.readFixedLengthString(DataTypesCodec.readUnsignedInt1(in), in);
DataTypesCodec.readNul(in);
tableName = DataTypesCodec.readFixedLengthString(DataTypesCodec.readUnsignedInt1(in), in);
DataTypesCodec.readNul(in);
var columnCount = DataTypesCodec.readLengthCodedIntLE(in);
initColumnDefs((int) columnCount);
decodeColumnType(in);
columnCount = DataTypesCodec.readLengthCodedIntLE(in);
decodeColumMeta(in);
// skip null bitmap
DataTypesCodec.readBitmap((int) columnCount, in);
}
private void initColumnDefs(final int columnCount) {
columnDefs = new ColumnDef[columnCount];
for (int i = 0; i < columnCount; i++) {
columnDefs[i] = new ColumnDef();
}
}
private void decodeColumnType(final ByteBuf in) {
for (int i = 0; i < columnDefs.length; i++) {
columnDefs[i].setType(DataTypesCodec.readUnsignedInt1(in));
}
}
private void decodeColumMeta(final ByteBuf in) {
for (int i = 0; i < columnDefs.length; i++) {
var columnDef = columnDefs[i];
switch (columnDef.getType()) {
case ColumnTypes.MYSQL_TYPE_TINY_BLOB:
case ColumnTypes.MYSQL_TYPE_BLOB:
case ColumnTypes.MYSQL_TYPE_MEDIUM_BLOB:
case ColumnTypes.MYSQL_TYPE_LONG_BLOB:
case ColumnTypes.MYSQL_TYPE_DOUBLE:
case ColumnTypes.MYSQL_TYPE_FLOAT:
case ColumnTypes.MYSQL_TYPE_GEOMETRY:
case ColumnTypes.MYSQL_TYPE_JSON:
columnDef.setMeta(DataTypesCodec.readUnsignedInt1(in));
break;
case ColumnTypes.MYSQL_TYPE_STRING:
columnDef.setMeta(DataTypesCodec.readUnsignedInt2BE(in));
break;
case ColumnTypes.MYSQL_TYPE_BIT:
columnDef.setMeta(DataTypesCodec.readUnsignedInt2LE(in));
break;
case ColumnTypes.MYSQL_TYPE_VARCHAR:
columnDef.setMeta(DataTypesCodec.readUnsignedInt2LE(in));
break;
case ColumnTypes.MYSQL_TYPE_NEWDECIMAL:
columnDef.setMeta(DataTypesCodec.readUnsignedInt2BE(in));
break;
case ColumnTypes.MYSQL_TYPE_TIME2:
case ColumnTypes.MYSQL_TYPE_DATETIME2:
case ColumnTypes.MYSQL_TYPE_TIMESTAMP2:
columnDef.setMeta(DataTypesCodec.readUnsignedInt1(in));
break;
default:
columnDef.setMeta(0);
break;
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册