diff --git a/client-adapter/launcher/src/main/resources/application.yml b/client-adapter/launcher/src/main/resources/application.yml index 1d78aead7eb7e5dd1ef4093e8f5e2c9b27652d1f..3df3f4beb66f345705791f344efb261d1d744085 100644 --- a/client-adapter/launcher/src/main/resources/application.yml +++ b/client-adapter/launcher/src/main/resources/application.yml @@ -28,16 +28,16 @@ canal.conf: - instance: example groups: - outAdapters: -# - name: logger - - name: rdb - key: oracle1 - properties: - jdbc.driverClassName: oracle.jdbc.OracleDriver - jdbc.url: jdbc:oracle:thin:@localhost:49161:XE - jdbc.username: mytest - jdbc.password: m121212 - threads: 5 - commitSize: 5000 + - name: logger +# - name: rdb +# key: oracle1 +# properties: +# jdbc.driverClassName: oracle.jdbc.OracleDriver +# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE +# jdbc.username: mytest +# jdbc.password: m121212 +# threads: 5 +# commitSize: 5000 # - name: rdb # key: postgres1 # properties: diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java index 729b895976b3d80cebbeeff0d8270979b63221b7..4e7e25044b9e73ed7b5681d770fc4b9843407b3a 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/RdbAdapter.java @@ -5,9 +5,6 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -18,11 +15,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.client.adapter.OuterAdapter; import com.alibaba.otter.canal.client.adapter.rdb.config.ConfigLoader; import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; import com.alibaba.otter.canal.client.adapter.rdb.service.RdbEtlService; import com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService; +import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml; import com.alibaba.otter.canal.client.adapter.support.*; @SPI("rdb") @@ -30,8 +30,8 @@ public class RdbAdapter implements OuterAdapter { private static Logger logger = LoggerFactory.getLogger(RdbAdapter.class); - private Map rdbMapping = new HashMap<>(); // 文件名对应配置 - private Map mappingConfigCache = new HashMap<>(); // 库名-表名对应配置 + private Map rdbMapping = new HashMap<>(); // 文件名对应配置 + private Map mappingConfigCache = new HashMap<>(); // 库名-表名对应配置 private DruidDataSource dataSource; @@ -39,6 +39,12 @@ public class RdbAdapter implements OuterAdapter { private int commitSize = 3000; + private volatile boolean running = false; + + private List dmlList = Collections.synchronizedList(new ArrayList<>()); + private Lock syncLock = new ReentrantLock(); + private ExecutorService executor = Executors.newFixedThreadPool(1); + @Override public void init(OuterAdapterConfig configuration) { Map rdbMappingTmp = ConfigLoader.load(); @@ -81,52 +87,54 @@ public class RdbAdapter implements OuterAdapter { if (commitSize != null) { this.commitSize = Integer.valueOf(commitSize); } - rdbSyncService = new RdbSyncService(this.commitSize, - threads != null ? Integer.valueOf(threads) : null, - dataSource); - } + rdbSyncService = new RdbSyncService(threads != null ? Integer.valueOf(threads) : null, dataSource); - private AtomicInteger batchRowNum = new AtomicInteger(0); - private List dmlList = Collections.synchronizedList(new ArrayList<>()); - private Lock syncLock = new ReentrantLock(); - private Condition condition = syncLock.newCondition(); - private ExecutorService executor = Executors.newFixedThreadPool(1); - - @Override - public void sync(Dml dml) { - boolean first = batchRowNum.get() == 0; - int currentSize = batchRowNum.addAndGet(dml.getData().size()); - dmlList.add(dml); + running = true; - if (first) { - // 开启超时判断 - executor.submit(() -> { + executor.submit(() -> { + while (running) { try { - syncLock.lock(); - if (!condition.await(5, TimeUnit.SECONDS)) { - // 批量超时 + int size1 = dmlList.size(); + Thread.sleep(3000); + int size2 = dmlList.size(); + if (size1 == size2) { + // 超时提交 sync(); } } catch (Exception e) { logger.error(e.getMessage(), e); - } finally { - syncLock.unlock(); } - }); - } + } + }); + } + + @Override + public void sync(Dml dml) { + String destination = StringUtils.trimToEmpty(dml.getDestination()); + String database = dml.getDatabase(); + String table = dml.getTable(); + MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table); + + List simpleDmlList = SimpleDml.dml2SimpleDml(dml, config); - if (currentSize > commitSize) { + dmlList.addAll(simpleDmlList); + + if (dmlList.size() > commitSize) { sync(); } + + if (logger.isDebugEnabled()) { + logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue)); + } } private void sync() { try { syncLock.lock(); - rdbSyncService.sync(mappingConfigCache, dmlList); - batchRowNum.set(0); - dmlList.clear(); - condition.signal(); + if (!dmlList.isEmpty()) { + rdbSyncService.sync(dmlList); + dmlList.clear(); + } } finally { syncLock.unlock(); } @@ -226,6 +234,7 @@ public class RdbAdapter implements OuterAdapter { @Override public void destroy() { + running = false; executor.shutdown(); if (rdbSyncService != null) { diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java index 5e8c65e82db91d38e99d6443ba36e77952a8a228..3e3d442c18cc630616c49d00938f4f120e7fd169 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/BatchExecutor.java @@ -23,18 +23,14 @@ public class BatchExecutor { private Integer key; private Connection conn; - private int commitSize = 3000; private AtomicInteger idx = new AtomicInteger(0); private ExecutorService executor = Executors.newFixedThreadPool(1); private Lock commitLock = new ReentrantLock(); private Condition condition = commitLock.newCondition(); - public BatchExecutor(Integer key, Connection conn, Integer commitSize){ + public BatchExecutor(Integer key, Connection conn){ this.key = key; this.conn = conn; - if (commitSize != null) { - this.commitSize = commitSize; - } try { this.conn.setAutoCommit(false); @@ -71,29 +67,6 @@ public class BatchExecutor { } catch (SQLException e) { logger.error(e.getMessage(), e); } - // int i = idx.incrementAndGet(); - // - // // 批次的第一次执行设置延时 - // if (i == 1) { - // executor.submit(() -> { - // try { - // commitLock.lock(); - // conn.commit(); //直接提交一次 - // if (!condition.await(5, TimeUnit.SECONDS)) { - // // 超时提交 - // commit(); - // } - // } catch (Exception e) { - // logger.error(e.getMessage(), e); - // } finally { - // commitLock.unlock(); - // } - // }); - // } - // - // if (i == commitSize) { - // commit(); - // } } public void commit() { diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java index 04bf79dab8e7b11eca6313aade61ca56f6feae3d..9467b8b98e365d1d1cb5495d4b799b887a8cbac1 100644 --- a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java @@ -5,23 +5,22 @@ import java.io.StringReader; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.*; -import java.sql.Date; -import java.util.*; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.*; import javax.sql.DataSource; -import org.apache.commons.lang.StringUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping; +import com.alibaba.otter.canal.client.adapter.rdb.support.SimpleDml; import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil; -import com.alibaba.otter.canal.client.adapter.support.Dml; import com.alibaba.otter.canal.client.adapter.support.Util; /** @@ -42,7 +41,7 @@ public class RdbSyncService { private ExecutorService[] threadExecutors; - public RdbSyncService(Integer commitSize, Integer threads, DataSource dataSource){ + public RdbSyncService(Integer threads, DataSource dataSource){ try { if (threads != null && threads > 1 && threads <= 10) { this.threads = threads; @@ -51,7 +50,7 @@ public class RdbSyncService { for (int i = 0; i < this.threads; i++) { Connection conn = dataSource.getConnection(); conn.setAutoCommit(false); - this.batchExecutors[i] = new BatchExecutor(i, conn, commitSize); + this.batchExecutors[i] = new BatchExecutor(i, conn); } threadExecutors = new ExecutorService[this.threads]; for (int i = 0; i < this.threads; i++) { @@ -63,96 +62,60 @@ public class RdbSyncService { } @SuppressWarnings("unchecked") - public void sync(Map mappingConfigCache, List dmlList) { - try { - List>[] dmlPartition = new ArrayList[threads]; - for (int i = 0; i < threads; i++) { - dmlPartition[i] = new ArrayList<>(); + private List[] simpleDmls2Partition(List simpleDmlList) { + List[] simpleDmlPartition = new ArrayList[threads]; + for (int i = 0; i < threads; i++) { + simpleDmlPartition[i] = new ArrayList<>(); + } + simpleDmlList.forEach(simpleDml -> { + int hash; + if (simpleDml.getConfig().getConcurrent()) { + hash = pkHash(simpleDml.getConfig().getDbMapping(), simpleDml.getData(), threads); + } else { + hash = Math.abs(Math.abs(simpleDml.getConfig().getDbMapping().getTargetTable().hashCode()) % threads); } - // 根据hash拆分 - dmlList.forEach(dml -> { - String destination = StringUtils.trimToEmpty(dml.getDestination()); - String database = dml.getDatabase(); - String table = dml.getTable(); - MappingConfig config = mappingConfigCache.get(destination + "." + database + "." + table); - - Dml[] dmls = new Dml[threads]; - for (int i = 0; i < threads; i++) { - Dml dmlTmp = new Dml(); - dmlTmp.setDestination(dml.getDestination()); - dmlTmp.setDatabase(dml.getDatabase()); - dmlTmp.setTable(dml.getTable()); - dmlTmp.setType(dml.getType()); - dmlTmp.setTs(dml.getTs()); - dmlTmp.setEs(dml.getEs()); - dmlTmp.setSql(dml.getSql()); - dmlTmp.setData(new ArrayList<>()); - dmlTmp.setOld(new ArrayList<>()); - dmls[i] = dmlTmp; - } - int idx = 0; - for (Map data : dml.getData()) { - int hash; - if (config.getConcurrent()) { - hash = pkHash(config.getDbMapping(), data, threads); - } else { - hash = Math.abs(Math.abs(config.getDbMapping().getTargetTable().hashCode()) % threads); - } - Dml dmlTmp = dmls[hash]; - dmlTmp.getData().add(data); - if (dml.getOld() != null) { - dmlTmp.getOld().add(dml.getOld().get(idx)); - } + simpleDmlPartition[hash].add(simpleDml); + }); + return simpleDmlPartition; + } - idx++; - } - for (int i = 0; i < threads; i++) { - Map item = new HashMap<>(); - item.put("dml", dmls[i]); - item.put("config", config); - dmlPartition[i].add(item); - } + public void sync(List simpleDmlList) { + try { + List[] simpleDmlsPartition = simpleDmls2Partition(simpleDmlList); - }); List> futures = new ArrayList<>(); for (int i = 0; i < threads; i++) { - int j = i; - futures.add(threadExecutors[i].submit(() -> { - dmlPartition[j].forEach(item -> { - MappingConfig config = (MappingConfig) item.get("config"); - Dml dml = (Dml) item.get("dml"); - sync(config, dml); - }); - batchExecutors[j].commit(); - return true; - })); + if (!simpleDmlsPartition[i].isEmpty()) { + int j = i; + futures.add(threadExecutors[i].submit(() -> { + simpleDmlsPartition[j].forEach(simpleDml -> sync(simpleDml, batchExecutors[j])); + batchExecutors[j].commit(); + return true; + })); + } } - for (int i = 0; i < threads; i++) { + + futures.forEach(future -> { try { - futures.get(i).get(); + future.get(); } catch (Exception e) { // ignore } - } + }); } catch (Exception e) { logger.error("Error rdb sync for batch", e); } } - public void sync(MappingConfig config, Dml dml) { + public void sync(SimpleDml dml, BatchExecutor batchExecutor) { try { - if (config != null) { - String type = dml.getType(); - if (type != null && type.equalsIgnoreCase("INSERT")) { - insert(config, dml); - } else if (type != null && type.equalsIgnoreCase("UPDATE")) { - update(config, dml); - } else if (type != null && type.equalsIgnoreCase("DELETE")) { - delete(config, dml); - } - if (logger.isDebugEnabled()) { - logger.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue)); - } + String type = dml.getType(); + if (type != null && type.equalsIgnoreCase("INSERT")) { + insert(dml, batchExecutor); + } else if (type != null && type.equalsIgnoreCase("UPDATE")) { + update(dml, batchExecutor); + } else if (type != null && type.equalsIgnoreCase("DELETE")) { + delete(dml, batchExecutor); } } catch (Exception e) { logger.error(e.getMessage(), e); @@ -162,18 +125,17 @@ public class RdbSyncService { /** * 插入操作 * - * @param config 配置项 - * @param dml DML数据 + * @param simpleDml DML数据 */ - private void insert(MappingConfig config, Dml dml) { - List> data = dml.getData(); + private void insert(SimpleDml simpleDml, BatchExecutor batchExecutor) { + Map data = simpleDml.getData(); if (data == null || data.isEmpty()) { return; } - DbMapping dbMapping = config.getDbMapping(); + DbMapping dbMapping = simpleDml.getConfig().getDbMapping(); - Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0)); + Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data); StringBuilder insertSql = new StringBuilder(); insertSql.append("INSERT INTO ").append(dbMapping.getTargetTable()).append(" ("); @@ -188,185 +150,121 @@ public class RdbSyncService { len = insertSql.length(); insertSql.delete(len - 1, len).append(")"); - Map ctype = getTargetColumnType(batchExecutors[0].getConn(), config); + Map ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig()); String sql = insertSql.toString(); - Integer tbHash = null; - // 如果不是并行同步的表 - if (!config.getConcurrent()) { - // 按表名hash到一个线程 - tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads); - } - for (Map d : data) { - int hash; - if (tbHash != null) { - hash = tbHash; - } else { - hash = pkHash(dbMapping, d, threads); - } - // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash]; - // checkQueue(tpe); - // tpe.submit(() -> { - try { - BatchExecutor batchExecutor = batchExecutors[hash]; - List> values = new ArrayList<>(); - - for (Map.Entry entry : columnsMap.entrySet()) { - String targetColumnName = entry.getKey(); - String srcColumnName = entry.getValue(); - if (srcColumnName == null) { - srcColumnName = targetColumnName; - } - Integer type = ctype.get(targetColumnName.toLowerCase()); - if (type == null) { - throw new RuntimeException("No column: " + targetColumnName + " found in target db"); - } - Object value = d.get(srcColumnName); - BatchExecutor.setValue(values, type, value); + try { + List> values = new ArrayList<>(); + + for (Map.Entry entry : columnsMap.entrySet()) { + String targetColumnName = entry.getKey(); + String srcColumnName = entry.getValue(); + if (srcColumnName == null) { + srcColumnName = targetColumnName; + } + + Integer type = ctype.get(targetColumnName.toLowerCase()); + if (type == null) { + throw new RuntimeException("No column: " + targetColumnName + " found in target db"); } - batchExecutor.execute(sql, values); - } catch (Exception e) { - logger.error(e.getMessage(), e); + Object value = data.get(srcColumnName); + BatchExecutor.setValue(values, type, value); } - // }); + batchExecutor.execute(sql, values); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } /** * 更新操作 * - * @param config 配置项 - * @param dml DML数据 + * @param simpleDml DML数据 */ - private void update(MappingConfig config, Dml dml) { - List> data = dml.getData(); + private void update(SimpleDml simpleDml, BatchExecutor batchExecutor) { + Map data = simpleDml.getData(); if (data == null || data.isEmpty()) { return; } - List> old = dml.getOld(); + Map old = simpleDml.getOld(); if (old == null || old.isEmpty()) { return; } - DbMapping dbMapping = config.getDbMapping(); + DbMapping dbMapping = simpleDml.getConfig().getDbMapping(); - int idx = 1; - Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data.get(0)); + Map columnsMap = SyncUtil.getColumnsMap(dbMapping, data); - Map ctype = getTargetColumnType(batchExecutors[0].getConn(), config); + Map ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig()); - Integer tbHash = null; - if (!config.getConcurrent()) { - // 按表名hash到一个线程 - tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads); - } - for (Map o : old) { - Map d = data.get(idx - 1); - - int hash; - if (tbHash != null) { - hash = tbHash; - } else { - hash = pkHash(dbMapping, d, o, threads); - } - - // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash]; - // checkQueue(tpe); - // tpe.submit(() -> { - try { - BatchExecutor batchExecutor = batchExecutors[hash]; - - StringBuilder updateSql = new StringBuilder(); - updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET "); - - List> values = new ArrayList<>(); - for (String srcColumnName : o.keySet()) { - List targetColumnNames = new ArrayList<>(); - columnsMap.forEach((targetColumn, srcColumn) -> { - if (srcColumnName.toLowerCase().equals(srcColumn)) { - targetColumnNames.add(targetColumn); - } - }); - if (!targetColumnNames.isEmpty()) { - for (String targetColumnName : targetColumnNames) { - updateSql.append(targetColumnName).append("=?, "); - Integer type = ctype.get(targetColumnName.toLowerCase()); - BatchExecutor.setValue(values, type, d.get(srcColumnName)); - } + try { + StringBuilder updateSql = new StringBuilder(); + updateSql.append("UPDATE ").append(dbMapping.getTargetTable()).append(" SET "); + + List> values = new ArrayList<>(); + for (String srcColumnName : old.keySet()) { + List targetColumnNames = new ArrayList<>(); + columnsMap.forEach((targetColumn, srcColumn) -> { + if (srcColumnName.toLowerCase().equals(srcColumn)) { + targetColumnNames.add(targetColumn); + } + }); + if (!targetColumnNames.isEmpty()) { + for (String targetColumnName : targetColumnNames) { + updateSql.append(targetColumnName).append("=?, "); + Integer type = ctype.get(targetColumnName.toLowerCase()); + BatchExecutor.setValue(values, type, data.get(srcColumnName)); } } - int len = updateSql.length(); - updateSql.delete(len - 2, len).append(" WHERE "); + } + int len = updateSql.length(); + updateSql.delete(len - 2, len).append(" WHERE "); - // 拼接主键 - appendCondition(dbMapping, updateSql, ctype, values, d, o); + // 拼接主键 + appendCondition(dbMapping, updateSql, ctype, values, data, old); - batchExecutor.execute(updateSql.toString(), values); + batchExecutor.execute(updateSql.toString(), values); - if (logger.isTraceEnabled()) { - logger.trace("Execute sql: {}", updateSql); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); + if (logger.isTraceEnabled()) { + logger.trace("Execute sql: {}", updateSql); } - // }); - - idx++; + } catch (Exception e) { + logger.error(e.getMessage(), e); } } /** * 删除操作 * - * @param config - * @param dml - * @throws SQLException + * @param simpleDml */ - private void delete(MappingConfig config, Dml dml) { - List> data = dml.getData(); + private void delete(SimpleDml simpleDml, BatchExecutor batchExecutor) { + Map data = simpleDml.getData(); if (data == null || data.isEmpty()) { return; } - DbMapping dbMapping = config.getDbMapping(); + DbMapping dbMapping = simpleDml.getConfig().getDbMapping(); - Map ctype = getTargetColumnType(batchExecutors[0].getConn(), config); - Integer tbHash = null; - if (!config.getConcurrent()) { - // 按表名hash到一个线程 - tbHash = Math.abs(Math.abs(dbMapping.getTargetTable().hashCode()) % threads); - } - for (Map d : data) { - int hash; - if (tbHash != null) { - hash = tbHash; - } else { - hash = pkHash(dbMapping, d, threads); - } + Map ctype = getTargetColumnType(batchExecutor.getConn(), simpleDml.getConfig()); - // ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadExecutors[hash]; - // checkQueue(tpe); - // tpe.submit(() -> { - try { - BatchExecutor batchExecutor = batchExecutors[hash]; - StringBuilder sql = new StringBuilder(); - sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE "); + try { + StringBuilder sql = new StringBuilder(); + sql.append("DELETE FROM ").append(dbMapping.getTargetTable()).append(" WHERE "); - List> values = new ArrayList<>(); + List> values = new ArrayList<>(); - // 拼接主键 - appendCondition(dbMapping, sql, ctype, values, d); + // 拼接主键 + appendCondition(dbMapping, sql, ctype, values, data); - batchExecutor.execute(sql.toString(), values); - if (logger.isTraceEnabled()) { - logger.trace("Execute sql: {}", sql); - } - } catch (Exception e) { - logger.error(e.getMessage(), e); + batchExecutor.execute(sql.toString(), values); + if (logger.isTraceEnabled()) { + logger.trace("Execute sql: {}", sql); } - // }); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } @@ -416,7 +314,8 @@ public class RdbSyncService { */ public static void setPStmt(int type, PreparedStatement pstmt, Object value, int i) throws SQLException { if (value == null) { - pstmt.setObject(i, type); + pstmt.setNull(i, type); + return; } switch (type) { case Types.BIT: @@ -638,18 +537,4 @@ public class RdbSyncService { executorService.shutdown(); } } - - private void checkQueue(ThreadPoolExecutor tpe) { - // 防止队列过大 - while (tpe.getQueue().size() > 10000) { - try { - Thread.sleep(3000); - while (tpe.getQueue().size() > 5000) { - Thread.sleep(1000); - } - } catch (InterruptedException e) { - // ignore - } - } - } } diff --git a/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java new file mode 100644 index 0000000000000000000000000000000000000000..cbe45190d44375aa8001af4f359d5dc51237bd35 --- /dev/null +++ b/client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/SimpleDml.java @@ -0,0 +1,102 @@ +package com.alibaba.otter.canal.client.adapter.rdb.support; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig; +import com.alibaba.otter.canal.client.adapter.support.Dml; +import org.apache.commons.lang.StringUtils; + +public class SimpleDml { + + private String destination; + private String database; + private String table; + private String type; + private Map data; + private Map old; + + private MappingConfig config; + + public String getDestination() { + return destination; + } + + public void setDestination(String destination) { + this.destination = destination; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Map getData() { + return data; + } + + public void setData(Map data) { + this.data = data; + } + + public Map getOld() { + return old; + } + + public void setOld(Map old) { + this.old = old; + } + + public MappingConfig getConfig() { + return config; + } + + public void setConfig(MappingConfig config) { + this.config = config; + } + + public static List dml2SimpleDml(Dml dml, MappingConfig config) { + List simpleDmlList = new ArrayList<>(); + int len = dml.getData().size(); + + String destination = StringUtils.trimToEmpty(dml.getDestination()); + String database = dml.getDatabase(); + String table = dml.getTable(); + + for (int i = 0; i < len; i++) { + SimpleDml simpleDml = new SimpleDml(); + simpleDml.setDestination(dml.getDestination()); + simpleDml.setDatabase(dml.getDatabase()); + simpleDml.setTable(dml.getTable()); + simpleDml.setType(dml.getType()); + simpleDml.setData(dml.getData().get(i)); + if (dml.getOld() != null) { + simpleDml.setOld(dml.getOld().get(i)); + } + simpleDml.setConfig(config); + simpleDmlList.add(simpleDml); + } + + return simpleDmlList; + } +}