未验证 提交 fecc1121 编写于 作者: R rewerma 提交者: GitHub

Merge pull request #21 from alibaba/master

merge
......@@ -2,13 +2,15 @@ package com.alibaba.otter.canal.client.adapter.support;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.*;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
......@@ -178,8 +180,9 @@ public class ExtensionLoader<T> {
@SuppressWarnings("unchecked")
private T createExtension(String name, String key) {
System.out.println("xxxxxxxxxxxxx");
getExtensionClasses().forEach((k, v) -> logger.info("fffff: " + k + " " + v.getName()));
// System.out.println("xxxxxxxxxxxxx");
// getExtensionClasses().forEach((k, v) -> logger.info("fffff: " + k +
// " " + v.getName()));
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
......
......@@ -45,6 +45,8 @@ public class CanalConstants {
public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize";
public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout";
public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage";
public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType";
public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks";
public static String getInstanceModeKey(String destination) {
return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);
......
......@@ -100,6 +100,8 @@ public class CanalLauncher {
CanalConstants.CANAL_MQ_CANALGETTIMEOUT)));
mqProperties.setFlatMessage(Boolean.valueOf(CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_FLATMESSAGE)));
mqProperties.setCompressionType(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_COMPRESSION_TYPE));
mqProperties.setAcks(CanalController.getProperty(properties,CanalConstants.CANAL_MQ_ACKS));
return mqProperties;
}
......
......@@ -304,7 +304,6 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
}
boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
if (tsdbEnable) {
mysqlEventParser.setEnableTsdb(tsdbEnable);
mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {
@Override
......@@ -327,6 +326,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
}
}
});
mysqlEventParser.setEnableTsdb(tsdbEnable);
}
eventParser = mysqlEventParser;
} else if (type.isLocalBinlog()) {
......
......@@ -57,6 +57,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
((DatabaseTableMeta) tableMetaTSDB).init(destination);
}
tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
......
......@@ -124,6 +124,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
((DatabaseTableMeta) tableMetaTSDB).init(destination);
}
tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
......
......@@ -10,6 +10,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
......@@ -62,9 +63,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
}
});
private ReadWriteLock lock = new ReentrantReadWriteLock();
private AtomicBoolean initialized = new AtomicBoolean(false);
private String destination;
private MemoryTableMeta memoryTableMeta;
private MysqlConnection connection; // 查询meta信息的链接
private volatile MysqlConnection connection; // 查询meta信息的链接
private CanalEventFilter filter;
private CanalEventFilter blackFilter;
private EntryPosition lastPosition;
......@@ -74,40 +76,42 @@ public class DatabaseTableMeta implements TableMetaTSDB {
private int snapshotInterval = 24;
private int snapshotExpire = 360;
private ScheduledFuture<?> scheduleSnapshotFuture;
public DatabaseTableMeta(){
}
@Override
public boolean init(final String destination) {
this.destination = destination;
this.memoryTableMeta = new MemoryTableMeta();
// 24小时生成一份snapshot
if (snapshotInterval > 0) {
scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
boolean applyResult = false;
try {
MDC.put("destination", destination);
applyResult = applySnapshotToDB(lastPosition, false);
} catch (Throwable e) {
logger.error("scheudle applySnapshotToDB faield", e);
}
if (initialized.compareAndSet(false, true)) {
this.destination = destination;
this.memoryTableMeta = new MemoryTableMeta();
// 24小时生成一份snapshot
if (snapshotInterval > 0) {
scheduleSnapshotFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
boolean applyResult = false;
try {
MDC.put("destination", destination);
applyResult = applySnapshotToDB(lastPosition, false);
} catch (Throwable e) {
logger.error("scheudle applySnapshotToDB faield", e);
}
try {
MDC.put("destination", destination);
if (applyResult) {
snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
try {
MDC.put("destination", destination);
if (applyResult) {
snapshotExpire((int) TimeUnit.HOURS.toSeconds(snapshotExpire));
}
} catch (Throwable e) {
logger.error("scheudle snapshotExpire faield", e);
}
} catch (Throwable e) {
logger.error("scheudle snapshotExpire faield", e);
}
}
}, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
}, snapshotInterval, snapshotInterval, TimeUnit.HOURS);
}
}
return true;
}
......
......@@ -36,7 +36,6 @@ public class TableMetaTSDBBuilder {
}
}
TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB");
tableMetaTSDB.init(destination);
logger.info("{} init TableMetaTSDB with {}", destination, springXml);
return tableMetaTSDB;
} else {
......
......@@ -20,6 +20,8 @@ public class MQProperties {
private int canalBatchSize = 50;
private Long canalGetTimeout = 100L;
private boolean flatMessage = true;
private String compressionType = "none";
private String acks = "all";
public static class CanalDestination {
......@@ -149,4 +151,21 @@ public class MQProperties {
public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}
public String getCompressionType() {
return compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getAcks() {
return acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
}
......@@ -37,7 +37,8 @@ public class CanalKafkaProducer implements CanalMQProducer {
this.kafkaProperties = kafkaProperties;
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", "all");
properties.put("acks", kafkaProperties.getAcks());
properties.put("compression.type",kafkaProperties.getCompressionType());
properties.put("retries", kafkaProperties.getRetries());
properties.put("batch.size", kafkaProperties.getBatchSize());
properties.put("linger.ms", kafkaProperties.getLingerMs());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册