diff --git a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java index 154cd53f0ac7d0d6ad7c69587bf120ff45105b65..110916953639599a69696f746b3dae25a1306b5a 100644 --- a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java +++ b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java @@ -304,7 +304,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance { } boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable()); if (tsdbEnable) { - mysqlEventParser.setEnableTsdb(tsdbEnable); mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() { @Override @@ -327,6 +326,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance { } } }); + mysqlEventParser.setEnableTsdb(tsdbEnable); } eventParser = mysqlEventParser; } else if (type.isLocalBinlog()) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java index 4f6ace4f323686b6dff102ae500bf65617fd3cc0..db235d9bf3f1bf49186ea43bc5781b06f844f1d8 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java @@ -57,6 +57,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire); + ((DatabaseTableMeta) tableMetaTSDB).init(destination); } tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index e8ca45e3e371637eb4702de17ae1b62813fdba1e..167d2460d785a9470e014b390c7eddde79987f8e 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -124,6 +124,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire); + ((DatabaseTableMeta) tableMetaTSDB).init(destination); } tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java index 068db5ede4c2749e53a987b3244b39efe703a60e..dd90ffbfdc78de9f37f85d4182f971bfec1842d5 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java @@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; @@ -62,9 +63,10 @@ public class DatabaseTableMeta implements TableMetaTSDB { } }); private ReadWriteLock lock = new ReentrantReadWriteLock(); + private AtomicBoolean initialized = new AtomicBoolean(false); private String destination; private MemoryTableMeta memoryTableMeta; - private MysqlConnection connection; // 查询meta信息的链接 + private volatile MysqlConnection connection; // 查询meta信息的链接 private CanalEventFilter filter; private CanalEventFilter blackFilter; private EntryPosition lastPosition; @@ -74,40 +76,42 @@ public class DatabaseTableMeta implements TableMetaTSDB { private int snapshotInterval = 24; private int snapshotExpire = 360; private ScheduledFuture scheduleSnapshotFuture; - + public DatabaseTableMeta(){ } @Override public boolean init(final String destination) { - this.destination = destination; - this.memoryTableMeta = new MemoryTableMeta(); - - // 24小时生成一份snapshot - if (snapshotInterval > 0) { - scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - boolean applyResult = false; - try { - MDC.put("destination", destination); - applyResult = applySnapshotToDB(lastPosition, false); - } catch (Throwable e) { - logger.error("scheudle applySnapshotToDB faield", e); - } + if (initialized.compareAndSet(false, true)) { + this.destination = destination; + this.memoryTableMeta = new MemoryTableMeta(); + + // 24小时生成一份snapshot + if (snapshotInterval > 0) { + scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() { + + @Override + public void run() { + boolean applyResult = false; + try { + MDC.put("destination", destination); + applyResult = applySnapshotToDB(lastPosition, false); + } catch (Throwable e) { + logger.error("scheudle applySnapshotToDB faield", e); + } - try { - MDC.put("destination", destination); - if (applyResult) { - snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire)); + try { + MDC.put("destination", destination); + if (applyResult) { + snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire)); + } + } catch (Throwable e) { + logger.error("scheudle snapshotExpire faield", e); } - } catch (Throwable e) { - logger.error("scheudle snapshotExpire faield", e); } - } - }, snapshotInterval, snapshotInterval, TimeUnit.HOURS); + }, snapshotInterval, snapshotInterval, TimeUnit.HOURS); + } } return true; } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java index 107e1013f47cc6a4463a309d8e040e1e63e86761..75b97f143e09ffc9b24cc00491eea06045b1b27f 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java @@ -36,7 +36,6 @@ public class TableMetaTSDBBuilder { } } TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB"); - tableMetaTSDB.init(destination); logger.info("{} init TableMetaTSDB with {}", destination, springXml); return tableMetaTSDB; } else {