diff --git a/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java b/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java index b1b64d5b423c25b33ff7be631f875840b935da69..6993a55a039c2b41a805d5b87bebc9f5c5d62ac4 100644 --- a/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java +++ b/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java @@ -53,10 +53,14 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can // 处理group的模式 List eventParsers = ((GroupEventParser) eventParser).getEventParsers(); for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动 - ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter); + if(singleEventParser instanceof AbstractEventParser) { + ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter); + } } } else { - ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter); + if(eventParser instanceof AbstractEventParser) { + ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter); + } } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index 73b35df6e9efb71e03af4fa3fd4e423c5c6bf0be..409c15d2e3c7925ed73ba17e52e9f54c1a35b04c 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -242,7 +242,7 @@ public abstract class AbstractEventParser extends AbstractCanalLifeCycle if (parallel) { // build stage processor multiStageCoprocessor = buildMultiStageCoprocessor(); - if (isGTIDMode()) { + if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) { // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据 GTIDSet gtidSet = MysqlGTIDSet.parse(startPosition.getGtid()); ((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet); @@ -260,7 +260,7 @@ public abstract class AbstractEventParser extends AbstractCanalLifeCycle } } } else { - if (isGTIDMode()) { + if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) { // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据 erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler); } else { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index d2ce8356835a4c538bdc72de6c86e11ca4f6f37f..68eb506234334586230e4906c86d5d9075c1b6e5 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -355,11 +355,14 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE // GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的 LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination); if (logPosition != null) { - return logPosition.getPostion(); - } - - if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) { - return masterPosition; + // 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空 + if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) { + return logPosition.getPostion(); + } + }else { + if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) { + return masterPosition; + } } }