提交 a7be9c75 编写于 作者: A agapple

fixed mysql rotate checksum

上级 b6f1e0d1
......@@ -503,40 +503,19 @@ public class MysqlConnection implements ErosaConnection {
* </pre>
*/
private void loadBinlogChecksum() {
if (checkMariaDB()) {
ResultSetPacket rs = null;
try {
rs = query("select @@global.binlog_checksum");
} catch (IOException e) {
throw new CanalParseException(e);
}
List<String> columnValues = rs.getFieldValues();
if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
} else {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
}
}
}
/**
* 获取是否为mariadb
*/
private boolean checkMariaDB() {
ResultSetPacket rs = null;
try {
rs = query("SELECT @@version");
rs = query("select @@global.binlog_checksum");
} catch (IOException e) {
throw new CanalParseException(e);
}
List<String> columnValues = rs.getFieldValues();
if (columnValues != null && columnValues.size() >= 1) {
return StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
} else {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
}
return false;
}
private void accumulateReceivedBytes(long x) {
......
......@@ -44,10 +44,9 @@ import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
public class DirectLogFetcherTest {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected String binlogFileName = "mysql-bin.000001";
protected Charset charset = Charset.forName("utf-8");
private boolean isMariaDB;
private int binlogChecksum;
@Test
......@@ -193,7 +192,7 @@ public class DirectLogFetcherTest {
// 如果不设置会出现错误: Slave can not handle replication events with the
// checksum that master is configured to log
// 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
update("set @master_binlog_checksum= @@global.binlog_checksum", connector);
} catch (Exception e) {
logger.warn("update master_binlog_checksum failed", e);
}
......@@ -217,35 +216,18 @@ public class DirectLogFetcherTest {
}
private void loadBinlogChecksum(MysqlConnector connector) {
checkMariaDB(connector);
if (isMariaDB) {
ResultSetPacket rs = null;
try {
rs = query("select @@global.binlog_checksum", connector);
} catch (IOException e) {
throw new CanalParseException(e);
}
List<String> columnValues = rs.getFieldValues();
if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
} else {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
}
}
}
private void checkMariaDB(MysqlConnector connector) {
ResultSetPacket rs = null;
try {
rs = query("SELECT @@version", connector);
rs = query("select @@global.binlog_checksum", connector);
} catch (IOException e) {
throw new CanalParseException(e);
}
List<String> columnValues = rs.getFieldValues();
if (columnValues != null && columnValues.size() >= 1) {
isMariaDB = StringUtils.containsIgnoreCase(columnValues.get(0), "MariaDB");
if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")) {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
} else {
binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
}
}
......
......@@ -29,12 +29,12 @@ public class MysqlDumpTest {
@Test
public void testSimple() {
final MysqlEventParser controller = new MysqlEventParser();
final EntryPosition startPosition = new EntryPosition("mysql-bin.000012", 34051L, 100L);
final EntryPosition startPosition = new EntryPosition("mysql-bin.000001", 4L);
// startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
controller.setConnectionCharset(Charset.forName("UTF-8"));
controller.setSlaveId(3344L);
controller.setDetectingEnable(false);
controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"));
controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"));
controller.setMasterPosition(startPosition);
controller.setEnableTsdb(true);
controller.setDestination("example");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册