提交 16f335c3 编写于 作者: R rewerma 提交者: agapple

修复rest etl 存在SQL注入风险 (#1685)

* kafka 模式默认开启事务异步发送

* fix adapter etl rest sql inject bug
上级 a8bf2b64
package com.alibaba.otter.canal.client.adapter.support;
import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractEtlService {
protected Logger logger = LoggerFactory.getLogger(this.getClass());
private String type;
private AdapterConfig config;
public AbstractEtlService(String type, AdapterConfig config){
this.type = type;
this.config = config;
}
protected EtlResult importData(String sql, List<String> params) {
EtlResult etlResult = new EtlResult();
AtomicLong impCount = new AtomicLong();
List<String> errMsg = new ArrayList<>();
if (config == null) {
logger.warn("{} mapping config is null, etl go end ", type);
etlResult.setErrorMessage(type + "mapping config is null, etl go end ");
return etlResult;
}
long start = System.currentTimeMillis();
try {
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
List<Object> values = new ArrayList<>();
// 拼接条件
if (config.getMapping().getEtlCondition() != null && params != null) {
String etlCondition = config.getMapping().getEtlCondition();
int size = params.size();
for (String param : params) {
etlCondition = etlCondition.replace("{}", "?");
values.add(param);
}
sql += " " + etlCondition;
}
if (logger.isDebugEnabled()) {
logger.debug("etl sql : {}", sql);
}
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) Util.sqlRS(dataSource, countSql, values, rs -> {
Long count = null;
try {
if (rs.next()) {
count = ((Number) rs.getObject(1)).longValue();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return count == null ? 0L : count;
});
// 当大于1万条记录时开启多线程
if (cnt >= 10000) {
int threadCount = 3; // 从配置读取默认为3
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
List<Future<Boolean>> futures = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
if (i != threadCount - 1) {
size = perThreadCnt;
}
String sqlFinal;
if (size != null) {
sqlFinal = sql + " LIMIT " + offset + "," + size;
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
}
Future<Boolean> future = executor.submit(
() -> executeSqlImport(dataSource, sqlFinal, values, config.getMapping(), impCount, errMsg));
futures.add(future);
}
for (Future<Boolean> future : futures) {
future.get();
}
executor.shutdown();
} else {
executeSqlImport(dataSource, sql, values, config.getMapping(), impCount, errMsg);
}
logger.info("数据全量导入完成, 一共导入 {} 条数据, 耗时: {}", impCount.get(), System.currentTimeMillis() - start);
etlResult.setResultMessage("导入" + type + " 数据:" + impCount.get() + " 条");
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(type + " 数据导入异常 =>" + e.getMessage());
}
if (errMsg.isEmpty()) {
etlResult.setSucceeded(true);
} else {
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
}
return etlResult;
}
protected abstract boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
AdapterConfig.AdapterMapping mapping, AtomicLong impCount,
List<String> errMsg);
}
package com.alibaba.otter.canal.client.adapter.support;
public interface AdapterConfig {
String getDataSourceKey();
AdapterMapping getMapping();
interface AdapterMapping {
String getEtlCondition();
}
}
......@@ -2,13 +2,11 @@ package com.alibaba.otter.canal.client.adapter.support;
import java.io.File;
import java.net.URL;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -37,9 +35,9 @@ public class Util {
*/
public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
try (Connection conn = ds.getConnection();
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);) {
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
stmt.setFetchSize(Integer.MIN_VALUE);
try (ResultSet rs = stmt.executeQuery(sql);) {
try (ResultSet rs = stmt.executeQuery(sql)) {
return fun.apply(rs);
}
} catch (Exception e) {
......@@ -48,6 +46,26 @@ public class Util {
}
}
public static Object sqlRS(DataSource ds, String sql, List<Object> values, Function<ResultSet, Object> fun) {
try (Connection conn = ds.getConnection()) {
try (PreparedStatement pstmt = conn
.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
pstmt.setFetchSize(Integer.MIN_VALUE);
if (values != null) {
for (int i = 0; i < values.size(); i++) {
pstmt.setObject(i + 1, values.get(i));
}
}
try (ResultSet rs = pstmt.executeQuery()) {
return fun.apply(rs);
}
}
} catch (Exception e) {
logger.error("sqlRs has error, sql: {} ", sql);
throw new RuntimeException(e);
}
}
/**
* sql执行获取resultSet
*
......
......@@ -17,7 +17,6 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.MDC;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
......@@ -30,12 +29,7 @@ import com.alibaba.otter.canal.client.adapter.es.monitor.ESConfigMonitor;
import com.alibaba.otter.canal.client.adapter.es.service.ESEtlService;
import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
import com.alibaba.otter.canal.client.adapter.support.SPI;
import com.alibaba.otter.canal.client.adapter.support.*;
/**
* ES外部适配器
......@@ -194,7 +188,6 @@ public class ESAdapter implements OuterAdapter {
} else {
StringBuilder resultMsg = new StringBuilder();
boolean resSuccess = true;
// ds不为空说明传入的是datasourceKey
for (ESSyncConfig configTmp : esSyncConfig.values()) {
// 取所有的destination为task的配置
if (configTmp.getDestination().equals(task)) {
......
package com.alibaba.otter.canal.client.adapter.es.config;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
......@@ -11,7 +13,7 @@ import java.util.Map;
* @author rewerma 2018-11-01
* @version 1.0.0
*/
public class ESSyncConfig {
public class ESSyncConfig implements AdapterConfig {
private String dataSourceKey; // 数据源key
......@@ -78,7 +80,11 @@ public class ESSyncConfig {
this.esMapping = esMapping;
}
public static class ESMapping {
public ESMapping getMapping(){
return esMapping;
}
public static class ESMapping implements AdapterMapping{
private String _index;
private String _type;
......
package com.alibaba.otter.canal.client.adapter.es.service;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.sql.DataSource;
......@@ -25,18 +20,15 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.google.common.base.Joiner;
/**
* ES ETL Service
......@@ -44,116 +36,24 @@ import com.google.common.base.Joiner;
* @author rewerma 2018-11-01
* @version 1.0.0
*/
public class ESEtlService {
private static Logger logger = LoggerFactory.getLogger(ESEtlService.class);
public class ESEtlService extends AbstractEtlService {
private TransportClient transportClient;
private ESTemplate esTemplate;
private ESSyncConfig config;
public ESEtlService(TransportClient transportClient, ESSyncConfig config){
super("ES", config);
this.transportClient = transportClient;
this.esTemplate = new ESTemplate(transportClient);
this.config = config;
}
public EtlResult importData(List<String> params) {
EtlResult etlResult = new EtlResult();
AtomicLong impCount = new AtomicLong();
List<String> errMsg = new ArrayList<>();
String esIndex = "";
if (config == null) {
logger.warn("esSycnCofnig is null, etl go end !");
etlResult.setErrorMessage("esSycnCofnig is null, etl go end !");
return etlResult;
}
ESMapping mapping = config.getEsMapping();
esIndex = mapping.get_index();
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
Pattern pattern = Pattern.compile(".*:(.*)://.*/(.*)\\?.*$");
Matcher matcher = pattern.matcher(dataSource.getUrl());
if (!matcher.find()) {
throw new RuntimeException("Not found the schema of jdbc-url: " + config.getDataSourceKey());
}
String schema = matcher.group(2);
logger.info("etl from db: {}, to es index: {}", schema, esIndex);
long start = System.currentTimeMillis();
try {
String sql = mapping.getSql();
// 拼接条件
if (mapping.getEtlCondition() != null && params != null) {
String etlCondition = mapping.getEtlCondition();
int size = params.size();
for (int i = 0; i < size; i++) {
etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
}
sql += " " + etlCondition;
}
if (logger.isDebugEnabled()) {
logger.debug("etl sql : {}", mapping.getSql());
}
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) Util.sqlRS(dataSource, countSql, rs -> {
Long count = null;
try {
if (rs.next()) {
count = ((Number) rs.getObject(1)).longValue();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return count == null ? 0L : count;
});
// 当大于1万条记录时开启多线程
if (cnt >= 10000) {
int threadCount = 3; // 从配置读取默认为3
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
if (i != threadCount - 1) {
size = perThreadCnt;
}
String sqlFinal;
if (size != null) {
sqlFinal = sql + " LIMIT " + offset + "," + size;
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
}
executor.execute(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
}
executor.shutdown();
while (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// ignore
}
} else {
executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
}
logger.info("数据全量导入完成,一共导入 {} 条数据, 耗时: {}", impCount.get(), System.currentTimeMillis() - start);
etlResult.setResultMessage("导入ES索引 " + esIndex + " 数据:" + impCount.get() + " 条");
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(esIndex + " etl failed! ==>" + e.getMessage());
}
if (errMsg.isEmpty()) {
etlResult.setSucceeded(true);
} else {
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
}
return etlResult;
logger.info("start etl to import data to index: {}", mapping.get_index());
String sql = mapping.getSql();
return importData(sql, params);
}
private void processFailBulkResponse(BulkResponse bulkResponse) {
......@@ -171,10 +71,12 @@ public class ESEtlService {
}
}
private boolean executeSqlImport(DataSource ds, String sql, ESMapping mapping, AtomicLong impCount,
List<String> errMsg) {
protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
AdapterConfig.AdapterMapping adapterMapping, AtomicLong impCount,
List<String> errMsg) {
try {
Util.sqlRS(ds, sql, rs -> {
ESMapping mapping = (ESMapping) adapterMapping;
Util.sqlRS(ds, sql, values, rs -> {
int count = 0;
try {
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
......
......@@ -17,5 +17,5 @@ esMapping:
from biz_order t"
skips:
- customer_id
etlCondition: "where t.c_time>='{0}'"
etlCondition: "where t.c_time>={}"
commitBatch: 3000
......@@ -9,7 +9,7 @@ esMapping:
customer_order:
name: customer
sql: "select t.id, t.name, t.email from customer t"
etlCondition: "where t.c_time>='{0}'"
etlCondition: "where t.c_time>={}"
commitBatch: 3000
......
......@@ -12,5 +12,5 @@ esMapping:
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
etlCondition: "where a.c_time>='{0}'"
etlCondition: "where a.c_time>={}"
commitBatch: 3000
......@@ -4,8 +4,6 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
......@@ -25,7 +23,10 @@ import com.alibaba.otter.canal.client.adapter.hbase.monitor.HbaseConfigMonitor;
import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseEtlService;
import com.alibaba.otter.canal.client.adapter.hbase.service.HbaseSyncService;
import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
import com.alibaba.otter.canal.client.adapter.support.*;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
import com.alibaba.otter.canal.client.adapter.support.SPI;
/**
* HBase外部适配器
......@@ -147,27 +148,16 @@ public class HbaseAdapter implements OuterAdapter {
public EtlResult etl(String task, List<String> params) {
EtlResult etlResult = new EtlResult();
MappingConfig config = hbaseMapping.get(task);
HbaseEtlService hbaseEtlService = new HbaseEtlService(hbaseTemplate, config);
if (config != null) {
DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (dataSource != null) {
return HbaseEtlService.importData(dataSource, hbaseTemplate, config, params);
} else {
etlResult.setSucceeded(false);
etlResult.setErrorMessage("DataSource not found");
return etlResult;
}
return hbaseEtlService.importData(params);
} else {
StringBuilder resultMsg = new StringBuilder();
boolean resSucc = true;
// ds不为空说明传入的是datasourceKey
for (MappingConfig configTmp : hbaseMapping.values()) {
// 取所有的destination为task的配置
if (configTmp.getDestination().equals(task)) {
DataSource dataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
if (dataSource == null) {
continue;
}
EtlResult etlRes = HbaseEtlService.importData(dataSource, hbaseTemplate, configTmp, params);
EtlResult etlRes = hbaseEtlService.importData(params);
if (!etlRes.getSucceeded()) {
resSucc = false;
resultMsg.append(etlRes.getErrorMessage()).append("\n");
......
package com.alibaba.otter.canal.client.adapter.hbase.config;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import java.util.*;
/**
......@@ -8,7 +10,7 @@ import java.util.*;
* @author rewerma 2018-8-21 下午06:45:49
* @version 1.0.0
*/
public class MappingConfig {
public class MappingConfig implements AdapterConfig {
private String dataSourceKey; // 数据源key
......@@ -60,6 +62,10 @@ public class MappingConfig {
this.hbaseMapping = hbaseMapping;
}
public AdapterMapping getMapping() {
return hbaseMapping;
}
public void validate() {
if (hbaseMapping.database == null || hbaseMapping.database.isEmpty()) {
throw new NullPointerException("hbaseMapping.database");
......@@ -179,7 +185,7 @@ public class MappingConfig {
}
}
public static class HbaseMapping {
public static class HbaseMapping implements AdapterMapping {
private Mode mode = Mode.STRING; // hbase默认转换格式
private String database; // 数据库名或schema名
......
package com.alibaba.otter.canal.client.adapter.hbase.service;
import java.sql.ResultSetMetaData;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import com.alibaba.otter.canal.client.adapter.support.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.hbase.support.HRow;
import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
import com.alibaba.otter.canal.client.adapter.hbase.support.PhType;
import com.alibaba.otter.canal.client.adapter.hbase.support.PhTypeUtil;
import com.alibaba.otter.canal.client.adapter.hbase.support.Type;
import com.alibaba.otter.canal.client.adapter.hbase.support.TypeUtil;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.JdbcTypeUtil;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.client.adapter.hbase.support.*;
import com.google.common.base.Joiner;
/**
......@@ -34,17 +26,21 @@ import com.google.common.base.Joiner;
* @author rewerma @ 2018-10-20
* @version 1.0.0
*/
public class HbaseEtlService {
public class HbaseEtlService extends AbstractEtlService {
private static Logger logger = LoggerFactory.getLogger(HbaseEtlService.class);
private HbaseTemplate hbaseTemplate;
private MappingConfig config;
public HbaseEtlService(HbaseTemplate hbaseTemplate, MappingConfig config){
super("HBase", config);
this.hbaseTemplate = hbaseTemplate;
this.config = config;
}
/**
* 建表
*
* @param hbaseTemplate
* @param config
*/
public static void createTable(HbaseTemplate hbaseTemplate, MappingConfig config) {
private void createTable() {
try {
// 判断hbase表是否存在,不存在则建表
MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
......@@ -60,29 +56,14 @@ public class HbaseEtlService {
/**
* 导入数据
*
* @param ds 数据源
* @param hbaseTemplate hbaseTemplate
* @param config 配置
* @param params 筛选条件
* @return 导入结果
*/
public static EtlResult importData(DataSource ds, HbaseTemplate hbaseTemplate, MappingConfig config,
List<String> params) {
public EtlResult importData(List<String> params) {
EtlResult etlResult = new EtlResult();
AtomicLong successCount = new AtomicLong();
List<String> errMsg = new ArrayList<>();
String hbaseTable = "";
try {
if (config == null) {
logger.error("Config is null!");
etlResult.setSucceeded(false);
etlResult.setErrorMessage("Config is null!");
return etlResult;
}
MappingConfig.HbaseMapping hbaseMapping = config.getHbaseMapping();
hbaseTable = hbaseMapping.getHbaseTable();
long start = System.currentTimeMillis();
if (params != null && params.size() == 1 && "rebuild".equalsIgnoreCase(params.get(0))) {
logger.info(hbaseMapping.getHbaseTable() + " rebuild is starting!");
......@@ -95,119 +76,28 @@ public class HbaseEtlService {
} else {
logger.info(hbaseMapping.getHbaseTable() + " etl is starting!");
}
createTable(hbaseTemplate, config);
createTable();
// 拼接sql
String sql = "SELECT * FROM " + config.getHbaseMapping().getDatabase() + "." + hbaseMapping.getTable();
// 拼接条件
if (params != null && params.size() == 1 && hbaseMapping.getEtlCondition() == null) {
AtomicBoolean stExists = new AtomicBoolean(false);
// 验证是否有SYS_TIME字段
Util.sqlRS(ds, sql, rs -> {
try {
ResultSetMetaData rsmd = rs.getMetaData();
int cnt = rsmd.getColumnCount();
for (int i = 1; i <= cnt; i++) {
String columnName = rsmd.getColumnName(i);
if ("SYS_TIME".equalsIgnoreCase(columnName)) {
stExists.set(true);
break;
}
}
} catch (Exception e) {
// ignore
}
return null;
});
if (stExists.get()) {
sql += " WHERE SYS_TIME >= '" + params.get(0) + "' ";
}
} else if (hbaseMapping.getEtlCondition() != null && params != null) {
String etlCondition = hbaseMapping.getEtlCondition();
int size = params.size();
for (int i = 0; i < size; i++) {
etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
}
sql += " " + etlCondition;
}
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) Util.sqlRS(ds, countSql, rs -> {
Long count = null;
try {
if (rs.next()) {
count = ((Number) rs.getObject(1)).longValue();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return count == null ? 0 : count;
});
// 当大于1万条记录时开启多线程
if (cnt >= 10000) {
int threadCount = 3;
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
if (i != threadCount - 1) {
size = perThreadCnt;
}
String sqlFinal;
if (size != null) {
sqlFinal = sql + " LIMIT " + offset + "," + size;
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
}
executor.submit(
() -> executeSqlImport(ds, sqlFinal, hbaseMapping, hbaseTemplate, successCount, errMsg));
}
executor.shutdown();
while (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// ignore
}
} else {
executeSqlImport(ds, sql, hbaseMapping, hbaseTemplate, successCount, errMsg);
}
logger.info(hbaseMapping.getHbaseTable() + " etl completed in: "
+ (System.currentTimeMillis() - start) / 1000 + "s!");
etlResult.setResultMessage("导入HBase表 " + hbaseMapping.getHbaseTable() + " 数据:" + successCount.get() + " 条");
return super.importData(sql, params);
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
}
if (errMsg.isEmpty()) {
etlResult.setSucceeded(true);
} else {
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
errMsg.add("HBase etl error ==>" + e.getMessage());
}
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
return etlResult;
}
/**
* 执行导入
*
* @param ds
* @param sql
* @param hbaseMapping
* @param hbaseTemplate
* @param successCount
* @param errMsg
* @return
*/
private static boolean executeSqlImport(DataSource ds, String sql, MappingConfig.HbaseMapping hbaseMapping,
HbaseTemplate hbaseTemplate, AtomicLong successCount, List<String> errMsg) {
protected boolean executeSqlImport(DataSource ds, String sql, List<Object> values,
AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
MappingConfig.HbaseMapping hbaseMapping = (MappingConfig.HbaseMapping) mapping;
try {
Util.sqlRS(ds, sql, rs -> {
Util.sqlRS(ds, sql, values, rs -> {
int i = 1;
try {
......@@ -353,9 +243,9 @@ public class HbaseEtlService {
complete = true;
}
i++;
successCount.incrementAndGet();
impCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("successful import count:" + successCount.get());
logger.debug("successful import count:" + impCount.get());
}
}
......
......@@ -8,6 +8,7 @@ hbaseMapping:
hbaseTable: MYTEST.PERSON2 # HBase表名
family: CF # 默认统一Family名称
uppercaseQualifier: true # 字段名转大写, 默认为true
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
#rowKey: id,type # 复合字段rowKey不能和columns中的rowKey重复
columns:
......
package com.alibaba.otter.canal.client.adapter.hbase.test;
import com.alibaba.otter.canal.client.adapter.hbase.support.HbaseTemplate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.junit.Test;
public class HBaseConnectionTest {
@Test
public void test01() {
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", "127.0.0.1");
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
hbaseConfig.set("zookeeper.znode.parent", "/hbase");
HbaseTemplate hbaseTemplate = new HbaseTemplate(hbaseConfig);
System.out.println(hbaseTemplate.tableExists("ttt"));
}
}
......@@ -8,8 +8,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
......@@ -181,27 +179,16 @@ public class RdbAdapter implements OuterAdapter {
public EtlResult etl(String task, List<String> params) {
EtlResult etlResult = new EtlResult();
MappingConfig config = rdbMapping.get(task);
RdbEtlService rdbEtlService = new RdbEtlService(dataSource, config);
if (config != null) {
DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
if (srcDataSource != null) {
return RdbEtlService.importData(srcDataSource, dataSource, config, params);
} else {
etlResult.setSucceeded(false);
etlResult.setErrorMessage("DataSource not found");
return etlResult;
}
return rdbEtlService.importData(params);
} else {
StringBuilder resultMsg = new StringBuilder();
boolean resSucc = true;
// ds不为空说明传入的是destination
for (MappingConfig configTmp : rdbMapping.values()) {
// 取所有的destination为task的配置
if (configTmp.getDestination().equals(task)) {
DataSource srcDataSource = DatasourceConfig.DATA_SOURCES.get(configTmp.getDataSourceKey());
if (srcDataSource == null) {
continue;
}
EtlResult etlRes = RdbEtlService.importData(srcDataSource, dataSource, configTmp, params);
EtlResult etlRes = rdbEtlService.importData(params);
if (!etlRes.getSucceeded()) {
resSucc = false;
resultMsg.append(etlRes.getErrorMessage()).append("\n");
......
......@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.config;
import java.util.LinkedHashMap;
import java.util.Map;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import org.apache.commons.lang.StringUtils;
/**
......@@ -11,19 +12,19 @@ import org.apache.commons.lang.StringUtils;
* @author rewerma 2018-11-07 下午02:41:34
* @version 1.0.0
*/
public class MappingConfig {
public class MappingConfig implements AdapterConfig {
private String dataSourceKey; // 数据源key
private String dataSourceKey; // 数据源key
private String destination; // canal实例或MQ的topic
private String destination; // canal实例或MQ的topic
private String groupId; // groupId
private String groupId; // groupId
private String outerAdapterKey; // 对应适配器的key
private String outerAdapterKey; // 对应适配器的key
private boolean concurrent = false; // 是否并行同步
private boolean concurrent = false; // 是否并行同步
private DbMapping dbMapping; // db映射配置
private DbMapping dbMapping; // db映射配置
public String getDataSourceKey() {
return dataSourceKey;
......@@ -73,6 +74,10 @@ public class MappingConfig {
this.destination = destination;
}
public AdapterMapping getMapping() {
return dbMapping;
}
public void validate() {
if (dbMapping.database == null || dbMapping.database.isEmpty()) {
throw new NullPointerException("dbMapping.database");
......@@ -85,7 +90,7 @@ public class MappingConfig {
}
}
public static class DbMapping {
public static class DbMapping implements AdapterMapping {
private boolean mirrorDb = false; // 是否镜像库
private String database; // 数据库名或schema名
......
package com.alibaba.otter.canal.client.adapter.rdb.service;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig.DbMapping;
import com.alibaba.otter.canal.client.adapter.rdb.support.SyncUtil;
import com.alibaba.otter.canal.client.adapter.support.AbstractEtlService;
import com.alibaba.otter.canal.client.adapter.support.AdapterConfig;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.google.common.base.Joiner;
/**
* RDB ETL 操作业务类
......@@ -32,139 +23,33 @@ import com.google.common.base.Joiner;
* @author rewerma @ 2018-11-7
* @version 1.0.0
*/
public class RdbEtlService {
public class RdbEtlService extends AbstractEtlService {
private DataSource targetDS;
private MappingConfig config;
private static final Logger logger = LoggerFactory.getLogger(RdbEtlService.class);
public RdbEtlService(DataSource targetDS, MappingConfig config){
super("RDB", config);
this.targetDS = targetDS;
this.config = config;
}
/**
* 导入数据
*/
public static EtlResult importData(DataSource srcDS, DataSource targetDS, MappingConfig config,
List<String> params) {
EtlResult etlResult = new EtlResult();
AtomicLong successCount = new AtomicLong();
List<String> errMsg = new ArrayList<>();
String hbaseTable = "";
try {
if (config == null) {
logger.error("Config is null!");
etlResult.setSucceeded(false);
etlResult.setErrorMessage("Config is null!");
return etlResult;
}
DbMapping dbMapping = config.getDbMapping();
long start = System.currentTimeMillis();
// 拼接sql
StringBuilder sql = new StringBuilder(
"SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable());
// 拼接条件
appendCondition(params, dbMapping, srcDS, sql);
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) Util.sqlRS(srcDS, countSql, rs -> {
Long count = null;
try {
if (rs.next()) {
count = ((Number) rs.getObject(1)).longValue();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return count == null ? 0 : count;
});
// 当大于1万条记录时开启多线程
if (cnt >= 10000) {
int threadCount = 3;
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
if (i != threadCount - 1) {
size = perThreadCnt;
}
String sqlFinal;
if (size != null) {
sqlFinal = sql + " LIMIT " + offset + "," + size;
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
}
executor
.execute(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
}
executor.shutdown();
while (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// ignore
}
} else {
executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
}
logger.info(
dbMapping.getTable() + " etl completed in: " + (System.currentTimeMillis() - start) / 1000 + "s!");
etlResult
.setResultMessage("导入目标表 " + SyncUtil.getDbTableName(dbMapping) + " 数据:" + successCount.get() + " 条");
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(hbaseTable + " etl failed! ==>" + e.getMessage());
}
if (errMsg.isEmpty()) {
etlResult.setSucceeded(true);
} else {
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
}
return etlResult;
}
private static void appendCondition(List<String> params, DbMapping dbMapping, DataSource ds,
StringBuilder sql) throws SQLException {
if (params != null && params.size() == 1 && dbMapping.getEtlCondition() == null) {
AtomicBoolean stExists = new AtomicBoolean(false);
// 验证是否有SYS_TIME字段
Util.sqlRS(ds, sql.toString(), rs -> {
try {
ResultSetMetaData rsmd = rs.getMetaData();
int cnt = rsmd.getColumnCount();
for (int i = 1; i <= cnt; i++) {
String columnName = rsmd.getColumnName(i);
if ("SYS_TIME".equalsIgnoreCase(columnName)) {
stExists.set(true);
break;
}
}
} catch (Exception e) {
// ignore
}
return null;
});
if (stExists.get()) {
sql.append(" WHERE SYS_TIME >= '").append(params.get(0)).append("' ");
}
} else if (dbMapping.getEtlCondition() != null && params != null) {
String etlCondition = dbMapping.getEtlCondition();
int size = params.size();
for (int i = 0; i < size; i++) {
etlCondition = etlCondition.replace("{" + i + "}", params.get(i));
}
sql.append(" ").append(etlCondition);
}
public EtlResult importData(List<String> params) {
DbMapping dbMapping = config.getDbMapping();
String sql = "SELECT * FROM " + dbMapping.getDatabase() + "." + dbMapping.getTable();
return importData(sql, params);
}
/**
* 执行导入
*/
private static boolean executeSqlImport(DataSource srcDS, DataSource targetDS, String sql, DbMapping dbMapping,
AtomicLong successCount, List<String> errMsg) {
protected boolean executeSqlImport(DataSource srcDS, String sql, List<Object> values,
AdapterConfig.AdapterMapping mapping, AtomicLong impCount, List<String> errMsg) {
try {
DbMapping dbMapping = (DbMapping) mapping;
Map<String, String> columnsMap = new LinkedHashMap<>();
Map<String, Integer> columnType = new LinkedHashMap<>();
......@@ -187,18 +72,12 @@ public class RdbEtlService {
}
});
Util.sqlRS(srcDS, sql, rs -> {
Util.sqlRS(srcDS, sql, values, rs -> {
int idx = 1;
try {
boolean completed = false;
// if (dbMapping.isMapAll()) {
// columnsMap = dbMapping.getAllColumns();
// } else {
// columnsMap = dbMapping.getTargetColumns();
// }
StringBuilder insertSql = new StringBuilder();
insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
columnsMap
......@@ -222,13 +101,13 @@ public class RdbEtlService {
pstmt.clearParameters();
// 删除数据
Map<String, Object> values = new LinkedHashMap<>();
Map<String, Object> pkVal = new LinkedHashMap<>();
StringBuilder deleteSql = new StringBuilder(
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
appendCondition(dbMapping, deleteSql, values, rs);
appendCondition(dbMapping, deleteSql, pkVal, rs);
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
int k = 1;
for (Object val : values.values()) {
for (Object val : pkVal.values()) {
pstmt2.setObject(k++, val);
}
pstmt2.execute();
......@@ -264,9 +143,9 @@ public class RdbEtlService {
completed = true;
}
idx++;
successCount.incrementAndGet();
impCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("successful import count:" + successCount.get());
logger.debug("successful import count:" + impCount.get());
}
}
if (!completed) {
......
......@@ -16,6 +16,8 @@ dbMapping:
role_id:
c_time:
test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
......
#################################################
######### common argument #############
######### common argument #############
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
......@@ -82,7 +82,7 @@ canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
......@@ -116,5 +116,5 @@ canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
canal.mq.transaction = true
#canal.mq.properties. =
......@@ -104,7 +104,7 @@
<argline>-server -Xms512m -Xmx1024m -Dfile.encoding=UTF-8
-Djava.net.preferIPv4Stack=true -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m
</argline>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
</properties>
......
......@@ -203,7 +203,7 @@ public class CanalKafkaProducer implements CanalMQProducer {
null,
JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
if (kafkaProperties.getTransaction()) {
producer2.send(record).get();
producer2.send(record);
} else {
producer2.send(record).get();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册