diff --git a/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java b/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java deleted file mode 100644 index 127b7ceb2fc1a65a3315396475f5fc5bb9683f67..0000000000000000000000000000000000000000 --- a/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/config/RefresherConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.alibaba.otter.canal.adapter.launcher.config; - -import org.springframework.cloud.context.refresh.ContextRefresher; -import org.springframework.cloud.context.scope.refresh.RefreshScope; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class RefresherConfig { - - @Bean - public RefreshScope refreshScope() { - return new RefreshScope(); - } - - @Bean - public ContextRefresher contextRefresher(ConfigurableApplicationContext configurableApplicationContext, - RefreshScope refreshScope) { - return new ContextRefresher(configurableApplicationContext, refreshScope); - } -} diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java index 11fe5c2fb6998110a2bd869fa35627a475779a95..78001c0314790a7bee5c25c155b8a1b42cba22bb 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java @@ -5,6 +5,8 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -20,6 +22,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.client.adapter.OuterAdapter; import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader; import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; +import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor; import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService; import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService; import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml; @@ -44,8 +47,11 @@ public class RdbAdapter implements OuterAdapter { private List dmlList = Collections .synchronizedList(new ArrayList<>()); private Lock syncLock = new ReentrantLock(); + private Condition condition = syncLock.newCondition(); private ExecutorService executor = Executors.newFixedThreadPool(1); + private RdbConfigMonitor rdbConfigMonitor; + public Map getRdbMapping() { return rdbMapping; } @@ -107,18 +113,21 @@ public class RdbAdapter implements OuterAdapter { executor.submit(() -> { while (running) { try { - int size1 = dmlList.size(); - Thread.sleep(3000); - int size2 = dmlList.size(); - if (size1 == size2) { + syncLock.lock(); + if (!condition.await(3, TimeUnit.SECONDS)) { // 超时提交 sync(); } } catch (Exception e) { logger.error(e.getMessage(), e); + } finally { + syncLock.unlock(); } } }); + + rdbConfigMonitor = new RdbConfigMonitor(); + rdbConfigMonitor.init(configuration.getKey(), this); } @Override @@ -131,10 +140,9 @@ public class RdbAdapter implements OuterAdapter { if (configMap != null) { configMap.values().forEach(config -> { List simpleDmlList = SimpleDml.dml2SimpleDml(dml, config); - dmlList.addAll(simpleDmlList); - if (dmlList.size() > commitSize) { + if (dmlList.size() >= commitSize) { sync(); } }); @@ -148,6 +156,7 @@ public class RdbAdapter implements OuterAdapter { try { syncLock.lock(); if (!dmlList.isEmpty()) { + condition.signal(); rdbSyncService.sync(dmlList); dmlList.clear(); } @@ -251,6 +260,10 @@ public class RdbAdapter implements OuterAdapter { @Override public void destroy() { running = false; + if (rdbConfigMonitor != null) { + rdbConfigMonitor.destroy(); + } + executor.shutdown(); if (rdbSyncService != null) { diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java index 9467b8b98e365d1d1cb5495d4b799b887a8cbac1..9f1f5469b67bb27219484e85a319705e47ad9263 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java @@ -10,6 +10,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource;