From 5ad9cf3f163d73ceaf84f66959978f0e6d5c044f Mon Sep 17 00:00:00 2001 From: wuwo Date: Tue, 13 Nov 2018 20:54:36 +0800 Subject: [PATCH] let tsdbSnapshotInterval config work correctly --- .../manager/CanalInstanceWithManager.java | 2 +- .../inbound/mysql/LocalBinlogEventParser.java | 1 + .../parse/inbound/mysql/MysqlEventParser.java | 1 + .../inbound/mysql/tsdb/DatabaseTableMeta.java | 56 ++++++++++--------- .../mysql/tsdb/TableMetaTSDBBuilder.java | 1 - 5 files changed, 33 insertions(+), 28 deletions(-) diff --git a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java index 154cd53f..11091695 100644 --- a/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java +++ b/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java @@ -304,7 +304,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance { } boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable()); if (tsdbEnable) { - mysqlEventParser.setEnableTsdb(tsdbEnable); mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() { @Override @@ -327,6 +326,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance { } } }); + mysqlEventParser.setEnableTsdb(tsdbEnable); } eventParser = mysqlEventParser; } else if (type.isLocalBinlog()) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java index 4f6ace4f..db235d9b 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java @@ -57,6 +57,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire); + ((DatabaseTableMeta) tableMetaTSDB).init(destination); } tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index e8ca45e3..167d2460 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -124,6 +124,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE ((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval); ((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire); + ((DatabaseTableMeta) tableMetaTSDB).init(destination); } tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java index 068db5ed..dd90ffbf 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java @@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; @@ -62,9 +63,10 @@ public class DatabaseTableMeta implements TableMetaTSDB { } }); private ReadWriteLock lock = new ReentrantReadWriteLock(); + private AtomicBoolean initialized = new AtomicBoolean(false); private String destination; private MemoryTableMeta memoryTableMeta; - private MysqlConnection connection; // 查询meta信息的链接 + private volatile MysqlConnection connection; // 查询meta信息的链接 private CanalEventFilter filter; private CanalEventFilter blackFilter; private EntryPosition lastPosition; @@ -74,40 +76,42 @@ public class DatabaseTableMeta implements TableMetaTSDB { private int snapshotInterval = 24; private int snapshotExpire = 360; private ScheduledFuture scheduleSnapshotFuture; - + public DatabaseTableMeta(){ } @Override public boolean init(final String destination) { - this.destination = destination; - this.memoryTableMeta = new MemoryTableMeta(); - - // 24小时生成一份snapshot - if (snapshotInterval > 0) { - scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - boolean applyResult = false; - try { - MDC.put("destination", destination); - applyResult = applySnapshotToDB(lastPosition, false); - } catch (Throwable e) { - logger.error("scheudle applySnapshotToDB faield", e); - } + if (initialized.compareAndSet(false, true)) { + this.destination = destination; + this.memoryTableMeta = new MemoryTableMeta(); + + // 24小时生成一份snapshot + if (snapshotInterval > 0) { + scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() { + + @Override + public void run() { + boolean applyResult = false; + try { + MDC.put("destination", destination); + applyResult = applySnapshotToDB(lastPosition, false); + } catch (Throwable e) { + logger.error("scheudle applySnapshotToDB faield", e); + } - try { - MDC.put("destination", destination); - if (applyResult) { - snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire)); + try { + MDC.put("destination", destination); + if (applyResult) { + snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire)); + } + } catch (Throwable e) { + logger.error("scheudle snapshotExpire faield", e); } - } catch (Throwable e) { - logger.error("scheudle snapshotExpire faield", e); } - } - }, snapshotInterval, snapshotInterval, TimeUnit.HOURS); + }, snapshotInterval, snapshotInterval, TimeUnit.HOURS); + } } return true; } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java index 107e1013..75b97f14 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/TableMetaTSDBBuilder.java @@ -36,7 +36,6 @@ public class TableMetaTSDBBuilder { } } TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB"); - tableMetaTSDB.init(destination); logger.info("{} init TableMetaTSDB with {}", destination, springXml); return tableMetaTSDB; } else { -- GitLab