提交 991418ce 编写于 作者: M mcy

修改rdb批量提交分批的问题

上级 d34bee33
package com.alibaba.otter.canal.adapter.launcher.config;
import org.springframework.cloud.context.refresh.ContextRefresher;
import org.springframework.cloud.context.scope.refresh.RefreshScope;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RefresherConfig {
@Bean
public RefreshScope refreshScope() {
return new RefreshScope();
}
@Bean
public ContextRefresher contextRefresher(ConfigurableApplicationContext configurableApplicationContext,
RefreshScope refreshScope) {
return new ContextRefresher(configurableApplicationContext, refreshScope);
}
}
......@@ -5,6 +5,8 @@ import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
......@@ -20,6 +22,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.rdb.monitor.RdbConfigMonitor;
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService;
import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService;
import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml;
......@@ -44,8 +47,11 @@ public class RdbAdapter implements OuterAdapter {
private List<SimpleDml> dmlList = Collections
.synchronizedList(new ArrayList<>());
private Lock syncLock = new ReentrantLock();
private Condition condition = syncLock.newCondition();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private RdbConfigMonitor rdbConfigMonitor;
public Map<String, MappingConfig> getRdbMapping() {
return rdbMapping;
}
......@@ -107,18 +113,21 @@ public class RdbAdapter implements OuterAdapter {
executor.submit(() -> {
while (running) {
try {
int size1 = dmlList.size();
Thread.sleep(3000);
int size2 = dmlList.size();
if (size1 == size2) {
syncLock.lock();
if (!condition.await(3, TimeUnit.SECONDS)) {
// 超时提交
sync();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
syncLock.unlock();
}
}
});
rdbConfigMonitor = new RdbConfigMonitor();
rdbConfigMonitor.init(configuration.getKey(), this);
}
@Override
......@@ -131,10 +140,9 @@ public class RdbAdapter implements OuterAdapter {
if (configMap != null) {
configMap.values().forEach(config -> {
List<SimpleDml> simpleDmlList = SimpleDml.dml2SimpleDml(dml, config);
dmlList.addAll(simpleDmlList);
if (dmlList.size() > commitSize) {
if (dmlList.size() >= commitSize) {
sync();
}
});
......@@ -148,6 +156,7 @@ public class RdbAdapter implements OuterAdapter {
try {
syncLock.lock();
if (!dmlList.isEmpty()) {
condition.signal();
rdbSyncService.sync(dmlList);
dmlList.clear();
}
......@@ -251,6 +260,10 @@ public class RdbAdapter implements OuterAdapter {
@Override
public void destroy() {
running = false;
if (rdbConfigMonitor != null) {
rdbConfigMonitor.destroy();
}
executor.shutdown();
if (rdbSyncService != null) {
......
......@@ -10,6 +10,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册