From 3165cb9640dffab410ac49b486a943ac1717e0c4 Mon Sep 17 00:00:00 2001 From: dataccs Date: Wed, 24 Apr 2019 09:56:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E7=94=A8=E6=88=B7=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E7=9A=84CanalAlarmHandler=20(#1736)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 支持用户自定义的CanalAlarmHandler * RowsLogEvent增加对TableMapLogEvent判空检查,防止NPE异常 --- .../dbsync/binlog/event/RowsLogEvent.java | 5 +++ .../exception/TableIdNotFoundException.java | 2 +- .../manager/CanalInstanceWithManager.java | 33 ++++++++++++++++++- .../manager/model/CanalParameter.java | 21 ++++++++++++ .../parse/inbound/AbstractEventParser.java | 2 +- .../inbound/mysql/dbsync/LogEventConvert.java | 2 +- 6 files changed, 61 insertions(+), 4 deletions(-) rename {parse/src/main/java/com/alibaba/otter/canal/parse => dbsync/src/main/java/com/taobao/tddl/dbsync/binlog}/exception/TableIdNotFoundException.java (93%) diff --git a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java index f11b0734..e1e78cd1 100644 --- a/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java +++ b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java @@ -2,6 +2,7 @@ package com.taobao.tddl.dbsync.binlog.event; import java.util.BitSet; +import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException; import com.taobao.tddl.dbsync.binlog.LogBuffer; import com.taobao.tddl.dbsync.binlog.LogContext; import com.taobao.tddl.dbsync.binlog.LogEvent; @@ -179,6 +180,10 @@ public abstract class RowsLogEvent extends LogEvent { public final void fillTable(LogContext context) { table = context.getTable(tableId); + if (table == null) { + throw new TableIdNotFoundException("not found tableId:" + tableId); + } + // end of statement check: if ((flags & RowsLogEvent.STMT_END_F) != 0) { // Now is safe to clear ignored map (clear_tables will also diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/exception/TableIdNotFoundException.java b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/exception/TableIdNotFoundException.java similarity index 93% rename from parse/src/main/java/com/alibaba/otter/canal/parse/exception/TableIdNotFoundException.java rename to dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/exception/TableIdNotFoundException.java index aff34d7f..ba670352 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/exception/TableIdNotFoundException.java +++ b/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/exception/TableIdNotFoundException.java @@ -1,4 +1,4 @@ -package com.alibaba.otter.canal.parse.exception; +package com.taobao.tddl.dbsync.binlog.exception; import com.alibaba.otter.canal.common.CanalException; diff --git a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java index 768caa22..43eaaf33 100644 --- a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java +++ b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java @@ -1,6 +1,9 @@ package com.alibaba.otter.canal.instance.manager; +import java.io.File; import java.net.InetSocketAddress; +import java.net.URL; +import java.net.URLClassLoader; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -8,6 +11,7 @@ import java.util.List; import org.apache.commons.lang.BooleanUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; @@ -106,7 +110,34 @@ public class CanalInstanceWithManager extends AbstractCanalInstance { protected void initAlarmHandler() { logger.info("init alarmHandler begin..."); - alarmHandler = new LogAlarmHandler(); + String alarmHandlerClass = parameters.getAlarmHandlerClass(); + String alarmHandlerPluginDir = parameters.getAlarmHandlerPluginDir(); + if (alarmHandlerClass == null || alarmHandlerPluginDir == null) { + alarmHandler = new LogAlarmHandler(); + } else { + try { + File externalLibDir = new File(alarmHandlerPluginDir); + File[] jarFiles = externalLibDir.listFiles((dir1, name) -> name.endsWith(".jar")); + if (jarFiles == null || jarFiles.length == 0) { + throw new IllegalStateException(String.format("alarmHandlerPluginDir [%s] can't find any name endswith \".jar\" file.", + alarmHandlerPluginDir)); + } + URL[] urls = new URL[jarFiles.length]; + for (int i = 0; i < jarFiles.length; i++) { + urls[i] = jarFiles[i].toURI().toURL(); + } + ClassLoader currentClassLoader = new URLClassLoader(urls, CanalInstanceWithManager.class.getClassLoader()); + Class _alarmClass = + (Class)currentClassLoader.loadClass(alarmHandlerClass); + alarmHandler = _alarmClass.newInstance(); + logger.info("init [{}] alarm handler success.", alarmHandlerClass); + } catch (Throwable e) { + String errorMsg = String.format("init alarmHandlerPluginDir [%s] alarm handler [%s] error: %s", + alarmHandlerPluginDir, alarmHandlerClass, ExceptionUtils.getFullStackTrace(e)); + logger.error(errorMsg); + throw new CanalException(errorMsg, e); + } + } logger.info("init alarmHandler end! \n\t load CanalAlarmHandler:{} ", alarmHandler.getClass().getName()); } diff --git a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java index 30e1fb9d..1a6cb99f 100644 --- a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java +++ b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java @@ -120,6 +120,11 @@ public class CanalParameter implements Serializable { private Long standbyTimestamp = null; private Boolean parallel = Boolean.FALSE; + //自定义alarmHandler类全路径 + private String alarmHandlerClass = null; + //自定义alarmHandler插件文件夹路径 + private String alarmHandlerPluginDir = null; + public static enum RunMode { /** 嵌入式 */ @@ -991,6 +996,22 @@ public class CanalParameter implements Serializable { this.parallel = parallel; } + public String getAlarmHandlerClass() { + return alarmHandlerClass; + } + + public void setAlarmHandlerClass(String alarmHandlerClass) { + this.alarmHandlerClass = alarmHandlerClass; + } + + public String getAlarmHandlerPluginDir() { + return alarmHandlerPluginDir; + } + + public void setAlarmHandlerPluginDir(String alarmHandlerPluginDir) { + this.alarmHandlerPluginDir = alarmHandlerPluginDir; + } + public String toString() { return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE); } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index cd6e25be..73b35df6 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -23,7 +23,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet; import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet; import com.alibaba.otter.canal.parse.exception.CanalParseException; import com.alibaba.otter.canal.parse.exception.PositionNotFoundException; -import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException; +import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException; import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor; import com.alibaba.otter.canal.parse.index.CanalLogPositionManager; diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java index 5a18b9eb..d022a255 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java @@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.otter.canal.common.AbstractCanalLifeCycle; import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter; import com.alibaba.otter.canal.parse.exception.CanalParseException; -import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException; +import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException; import com.alibaba.otter.canal.parse.inbound.BinlogParser; import com.alibaba.otter.canal.parse.inbound.TableMeta; import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta; -- GitLab