diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java index a5c1f9683446804dcd8ef9a1f692ebf3abeb10eb..ce16a232370d6f0ec3a8999953f7b72e6b22f4da 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java @@ -21,7 +21,6 @@ import com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor; import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor; import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet; import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket; -import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet; import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket; import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpGTIDCommandPacket; import com.alibaba.otter.canal.parse.driver.mysql.packets.client.RegisterSlaveCommandPacket; @@ -444,7 +443,8 @@ public class MysqlConnection implements ErosaConnection { // mysql5.6需要设置slave_uuid避免被server kill链接 update("set @slave_uuid=uuid()"); } catch (Exception e) { - if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) { + if (!StringUtils.contains(e.getMessage(), "Unknown system variable") + && !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) { logger.warn("update slave_uuid failed", e); } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index a29e88ec00c0bdb37f64dbfce2b06a6f408ee487..b6872b81da5bb66559aeb1ce55fd9705587ae1c8 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -69,7 +69,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE private int dumpErrorCount = 0; // binlogDump失败异常计数 private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值 private boolean rdsOssMode = false; - private boolean autoResetLatestPosMode = false; // true: binlog被删除之后,自动按最新的数据订阅 + private boolean autoResetLatestPosMode = false; // true: + // binlog被删除之后,自动按最新的数据订阅 protected ErosaConnection buildErosaConnection() { return buildMysqlConnection(this.runningInfo); @@ -347,7 +348,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) { return logPosition.getPostion(); } - }else { + } else { if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) { return masterPosition; } @@ -401,7 +402,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE fixedPosition.getJournalName(), true); if (entryPosition == null) { - throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position" + throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position " + fixedPosition.getJournalName() + ":" + fixedPosition.getPosition()); } return entryPosition; @@ -486,7 +487,8 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE return findPosition; } // 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能 - // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失 + // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog + // 丢失调到最新位点也即意味着数据丢失 if (isAutoResetLatestPosMode()) { dumpErrorCount = 0; return findEndPosition(mysqlConnection); @@ -497,9 +499,9 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE return null; } } else if (StringUtils.isBlank(logPosition.getPostion().getJournalName()) - && logPosition.getPostion().getPosition() <= 0 - && logPosition.getPostion().getTimestamp() > 0) { - return fallbackFindByStartTimestamp(logPosition,mysqlConnection); + && logPosition.getPostion().getPosition() <= 0 + && logPosition.getPostion().getTimestamp() > 0) { + return fallbackFindByStartTimestamp(logPosition, mysqlConnection); } // 其余情况 logger.warn("prepare to find start position just last position\n {}", @@ -522,7 +524,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE * @param mysqlConnection * @return */ - protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition,MysqlConnection mysqlConnection){ + protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition, MysqlConnection mysqlConnection) { long timestamp = logPosition.getPostion().getTimestamp(); long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000; logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "", diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java index 0c0d8ebc12b356ab702a92df2ca46d03c5c200fb..ff0164657cffed3001894681f3d29817fc353faa 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java @@ -627,7 +627,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar } if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) { - if (tableMetaCache.isOnRDS()) { + if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) { // 特殊处理下RDS的场景 List primaryKeys = tableMeta.getPrimaryFields(); if (primaryKeys == null || primaryKeys.isEmpty()) { @@ -680,6 +680,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) { // 不解析最后一列 String rdsRowIdColumnName = "__#alibaba_rds_row_id#__"; + if (tableMetaCache.isOnPolarX()) { + rdsRowIdColumnName = "_drds_implicit_id_"; + } buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false); Column.Builder columnBuilder = Column.newBuilder(); columnBuilder.setName(rdsRowIdColumnName); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java index 6afdefdb33e71e3bebb45db1b34447f76de08418..24615f9eb1639d0080a8bdea1a04b6dc52b2d365 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java @@ -39,6 +39,7 @@ public class TableMetaCache { public static final String EXTRA = "EXTRA"; private MysqlConnection connection; private boolean isOnRDS = false; + private boolean isOnPolarX = false; private boolean isOnTSDB = false; private TableMetaTSDB tableMetaTSDB; @@ -79,6 +80,14 @@ public class TableMetaCache { } } catch (IOException e) { } + + try { + ResultSetPacket packet = connection.query("show global variables like 'polarx\\_%'"); + if (packet.getFieldValues().size() > 0) { + isOnPolarX = true; + } + } catch (IOException e) { + } } private synchronized TableMeta getTableMetaByDB(String fullname) throws IOException { @@ -254,7 +263,6 @@ public class TableMetaCache { .toString(); } - public boolean isOnTSDB() { return isOnTSDB; } @@ -271,4 +279,12 @@ public class TableMetaCache { this.isOnRDS = isOnRDS; } + public boolean isOnPolarX() { + return isOnPolarX; + } + + public void setOnPolarX(boolean isOnPolarX) { + this.isOnPolarX = isOnPolarX; + } + } diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java index a6d158925cdc323574813c4d2ed59b6e29dffa84..92c355e6e35992562e16f67f2ed5b76ca809b4b2 100644 --- a/parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/DirectLogFetcherTest.java @@ -13,7 +13,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,6 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent; import com.taobao.tddl.dbsync.binlog.event.XidLogEvent; import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent; -@Ignore public class DirectLogFetcherTest { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -58,7 +56,7 @@ public class DirectLogFetcherTest { public void testSimple() { DirectLogFetcher fetcher = new DirectLogFetcher(); try { - MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"); + MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"); connector.connect(); updateSettings(connector); loadBinlogChecksum(connector); @@ -210,7 +208,8 @@ public class DirectLogFetcherTest { // mysql5.6需要设置slave_uuid避免被server kill链接 update("set @slave_uuid=uuid()", connector); } catch (Exception e) { - if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) { + if (!StringUtils.contains(e.getMessage(), "Unknown system variable") + && !StringUtils.contains(e.getMessage(), "slave_uuid can't be set")) { logger.warn("update slave_uuid failed", e); } }