提交 57c2cc4c 编写于 作者: 七锋

fixed issue #626 , support mysql xa parse

上级 3c913d1e
......@@ -30,15 +30,19 @@ import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
import com.taobao.tddl.dbsync.binlog.event.StartLogEventV3;
import com.taobao.tddl.dbsync.binlog.event.StopLogEvent;
import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
import com.taobao.tddl.dbsync.binlog.event.TransactionContextLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
import com.taobao.tddl.dbsync.binlog.event.ViewChangeEvent;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XaPrepareLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
/**
* Implements a binary-log decoder.
......@@ -366,6 +370,24 @@ public final class LogDecoder {
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.TRANSACTION_CONTEXT_EVENT: {
TransactionContextLogEvent event = new TransactionContextLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.VIEW_CHANGE_EVENT: {
ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.XA_PREPARE_LOG_EVENT: {
XaPrepareLogEvent event = new XaPrepareLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.ANNOTATE_ROWS_EVENT: {
AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
/* updating position in context */
......@@ -390,6 +412,12 @@ public final class LogDecoder {
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.START_ENCRYPTION_EVENT: {
StartEncryptionLogEvent event = new StartEncryptionLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
return event;
}
default:
/*
* Create an object of Ignorable_log_event for unrecognized
......@@ -402,9 +430,10 @@ public final class LogDecoder {
logPosition.position = header.getLogPos();
return event;
} else {
if (logger.isWarnEnabled()) logger.warn("Skipping unrecognized binlog event "
+ LogEvent.getTypeName(header.getType()) + " from: "
+ context.getLogPosition());
if (logger.isWarnEnabled()) {
logger.warn("Skipping unrecognized binlog event " + LogEvent.getTypeName(header.getType())
+ " from: " + context.getLogPosition());
}
}
}
......
......@@ -161,9 +161,17 @@ public abstract class LogEvent {
public static final int PREVIOUS_GTIDS_LOG_EVENT = 35;
/* MySQL 5.7 events */
public static final int TRANSACTION_CONTEXT_EVENT = 36;
public static final int VIEW_CHANGE_EVENT = 37;
/* Prepared XA transaction terminal event similar to Xid */
public static final int XA_PREPARE_LOG_EVENT = 38;
// mariaDb 5.5.34
/* New MySQL/Sun events are to be added right above this comment */
public static final int MYSQL_EVENTS_END = 36;
public static final int MYSQL_EVENTS_END = 39;
public static final int MARIA_EVENTS_BEGIN = 160;
/* New Maria event numbers start from here */
......@@ -189,8 +197,10 @@ public abstract class LogEvent {
*/
public static final int GTID_LIST_EVENT = 163;
public static final int START_ENCRYPTION_EVENT = 164;
/** end marker */
public static final int ENUM_END_EVENT = 164;
public static final int ENUM_END_EVENT = 165;
/**
* 1 byte length, 1 byte format Length is total length in bytes, including 2
......@@ -351,6 +361,22 @@ public abstract class LogEvent {
return "Anonymous_Gtid";
case PREVIOUS_GTIDS_LOG_EVENT:
return "Previous_gtids";
case TRANSACTION_CONTEXT_EVENT:
return "Transaction_context";
case VIEW_CHANGE_EVENT:
return "View_change";
case XA_PREPARE_LOG_EVENT:
return "XA_prepare";
case ANNOTATE_ROWS_EVENT:
return "Annotate_rows";
case BINLOG_CHECKPOINT_EVENT:
return "Binlog_checkpoint";
case GTID_EVENT:
return "Gtid";
case GTID_LIST_EVENT:
return "Gtid_list";
case START_ENCRYPTION_EVENT:
return "Start_encryption";
default:
return "Unknown"; /* impossible */
}
......@@ -359,7 +385,7 @@ public abstract class LogEvent {
protected static final Log logger = LogFactory.getLog(LogEvent.class);
protected final LogHeader header;
/**
* mysql半同步semi标识
*
......@@ -369,18 +395,16 @@ public abstract class LogEvent {
* </pre>
*/
protected int semival;
public int getSemival() {
return semival;
}
return semival;
}
public void setSemival(int semival) {
this.semival = semival;
}
public void setSemival(int semival) {
this.semival = semival;
}
protected LogEvent(LogHeader header){
protected LogEvent(LogHeader header){
this.header = header;
}
......
......@@ -59,10 +59,15 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
public static final int HEARTBEAT_HEADER_LEN = 0;
public static final int IGNORABLE_HEADER_LEN = 0;
public static final int ROWS_HEADER_LEN_V2 = 10;
public static final int TRANSACTION_CONTEXT_HEADER_LEN = 18;
public static final int VIEW_CHANGE_HEADER_LEN = 52;
public static final int XA_PREPARE_HEADER_LEN = 0;
public static final int ANNOTATE_ROWS_HEADER_LEN = 0;
public static final int BINLOG_CHECKPOINT_HEADER_LEN = 4;
public static final int GTID_HEADER_LEN = 19;
public static final int GTID_LIST_HEADER_LEN = 4;
public static final int START_ENCRYPTION_HEADER_LEN = 0;
public static final int POST_HEADER_LENGTH = 11;
......@@ -202,11 +207,17 @@ public final class FormatDescriptionLogEvent extends StartLogEventV3 {
postHeaderLen[GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
postHeaderLen[ANONYMOUS_GTID_LOG_EVENT - 1] = POST_HEADER_LENGTH;
postHeaderLen[PREVIOUS_GTIDS_LOG_EVENT - 1] = IGNORABLE_HEADER_LEN;
postHeaderLen[TRANSACTION_CONTEXT_EVENT - 1] = TRANSACTION_CONTEXT_HEADER_LEN;
postHeaderLen[VIEW_CHANGE_EVENT - 1] = VIEW_CHANGE_HEADER_LEN;
postHeaderLen[XA_PREPARE_LOG_EVENT - 1] = XA_PREPARE_HEADER_LEN;
// mariadb 10
postHeaderLen[ANNOTATE_ROWS_EVENT - 1] = ANNOTATE_ROWS_HEADER_LEN;
postHeaderLen[BINLOG_CHECKPOINT_EVENT - 1] = BINLOG_CHECKPOINT_HEADER_LEN;
postHeaderLen[GTID_EVENT - 1] = GTID_HEADER_LEN;
postHeaderLen[GTID_LIST_EVENT - 1] = GTID_LIST_HEADER_LEN;
postHeaderLen[START_ENCRYPTION_EVENT - 1] = START_ENCRYPTION_HEADER_LEN;
break;
case 3: /* 4.0.x x>=2 */
......
package com.taobao.tddl.dbsync.binlog.event;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;
/**
* @author agapple 2018年5月7日 下午7:05:39
* @version 1.0.26
* @since mysql 5.7
*/
public class TransactionContextLogEvent extends LogEvent {
public TransactionContextLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
}
}
package com.taobao.tddl.dbsync.binlog.event;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;
/**
* @author agapple 2018年5月7日 下午7:05:39
* @version 1.0.26
* @since mysql 5.7
*/
public class ViewChangeEvent extends LogEvent {
public ViewChangeEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
}
}
package com.taobao.tddl.dbsync.binlog.event;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;
/**
* @author agapple 2018年5月7日 下午7:05:39
* @version 1.0.26
* @since mysql 5.7
*/
public class XaPrepareLogEvent extends LogEvent {
private boolean onePhase;
private int formatId;
private int gtridLength;
private int bqualLength;
private byte[] data;
public XaPrepareLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
final int commonHeaderLen = descriptionEvent.getCommonHeaderLen();
final int postHeaderLen = descriptionEvent.getPostHeaderLen()[header.getType() - 1];
int offset = commonHeaderLen + postHeaderLen;
buffer.position(offset);
onePhase = (buffer.getInt8() == 0x00 ? false : true);
formatId = buffer.getInt32();
gtridLength = buffer.getInt32();
bqualLength = buffer.getInt32();
int MY_XIDDATASIZE = 128;
if (MY_XIDDATASIZE >= gtridLength + bqualLength && gtridLength >= 0 && gtridLength <= 64 && bqualLength >= 0
&& bqualLength <= 64) {
data = buffer.getData(gtridLength + bqualLength);
} else {
formatId = -1;
gtridLength = 0;
bqualLength = 0;
}
}
public boolean isOnePhase() {
return onePhase;
}
public int getFormatId() {
return formatId;
}
public int getGtridLength() {
return gtridLength;
}
public int getBqualLength() {
return bqualLength;
}
public byte[] getData() {
return data;
}
}
package com.taobao.tddl.dbsync.binlog.event.mariadb;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.LogHeader;
/**
* mariadb的Start_encryption_log_event
*
* @author agapple 2018年5月7日 下午7:23:02
* @version 1.0.26
*/
public class StartEncryptionLogEvent extends LogEvent {
public StartEncryptionLogEvent(LogHeader header, LogBuffer buffer, FormatDescriptionLogEvent descriptionEvent){
super(header);
}
}
......@@ -27,7 +27,8 @@ public class MysqlDumpTest {
@Test
public void testSimple() {
final MysqlEventParser controller = new MysqlEventParser();
final EntryPosition startPosition = new EntryPosition("mysql-bin.000010", 154L);
final EntryPosition startPosition = new EntryPosition("mysql-bin.000010", 154L, 100L);
startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
controller.setConnectionCharset(Charset.forName("UTF-8"));
controller.setSlaveId(3344L);
controller.setDetectingEnable(false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册