提交 4f00f5ea 编写于 作者: A agapple

fixed tsdb test

上级 19707f92
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.otter</groupId>
......@@ -34,10 +35,6 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
......@@ -63,10 +60,37 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring_version}</version>
<scope>test</scope>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.taobao.tddl.dbsync.binlog;
/**
* Position inside binlog file
*
* @author <a href="mailto:seppo.jaakola@continuent.com">Seppo Jaakola</a>
* @version 1.0
*/
public class BinlogPosition extends LogPosition {
/* The source server_id of position, 0 invalid */
protected final long masterId;
/* The timestamp, in seconds, 0 invalid */
protected final long timestamp;
public BinlogPosition(String fileName, long position, long masterId, long timestamp){
super(fileName, position);
this.masterId = masterId;
this.timestamp = timestamp;
}
public BinlogPosition(LogPosition logPosition, long masterId, long timestamp){
super(logPosition.getFileName(), logPosition.getPosition());
this.masterId = masterId;
this.timestamp = timestamp;
}
public BinlogPosition(BinlogPosition binlogPosition){
super(binlogPosition.getFileName(), binlogPosition.getPosition());
this.masterId = binlogPosition.masterId;
this.timestamp = binlogPosition.timestamp;
}
private final static long[] pow10 = { 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000,
10000000000L, 100000000000L, 1000000000000L, 10000000000000L, 100000000000000L, 1000000000000000L,
10000000000000000L, 100000000000000000L, 1000000000000000000L };
public static String placeHolder(int bit, long number) {
if (bit > 18) {
throw new IllegalArgumentException("Bit must less than 18, but given " + bit);
}
final long max = pow10[bit];
if (number >= max) {
// 当 width < 数值的最大位数时,应该直接返回数值
return String.valueOf(number);
}
return String.valueOf(max + number).substring(1);
}
public String format2String(final int positionMaxLen) {
String binlogSuffix = fileName;
String binlogOffset = placeHolder((int) positionMaxLen, position);
// 输出 '000001:0000000004@12+12314130'
StringBuffer buf = new StringBuffer(40);
buf.append(binlogSuffix);
buf.append(':');
buf.append(binlogOffset);
if (masterId != 0) {
buf.append('#');
buf.append(masterId);
}
if (timestamp != 0) {
buf.append('.');
buf.append(timestamp);
}
return buf.toString();
}
public static BinlogPosition parseFromString(String source) {
int colonIndex = source.indexOf(':');
int miscIndex = colonIndex + 1;
int sharpIndex = source.indexOf('#', miscIndex);
int semicolonIndex = source.indexOf(';', miscIndex); // NOTE: 向后兼容
int dotIndex = source.lastIndexOf('.');
if (colonIndex == -1) {
return null; // NOTE: 错误的位点
}
String binlogSuffix = source.substring(0, colonIndex);
long binlogPosition;
if (sharpIndex != -1) {
binlogPosition = Long.parseLong(source.substring(miscIndex, sharpIndex));
} else if (semicolonIndex != -1) {
binlogPosition = Long.parseLong(source.substring(miscIndex, semicolonIndex)); // NOTE:
// 向后兼容
} else if (dotIndex != -1) {
binlogPosition = Long.parseLong(source.substring(miscIndex, dotIndex));
} else {
binlogPosition = Long.parseLong(source.substring(miscIndex));
}
long masterId = 0; // NOTE: 默认值为 0
if (sharpIndex != -1) {
if (dotIndex != -1) {
masterId = Long.parseLong(source.substring(sharpIndex + 1, dotIndex));
} else {
masterId = Long.parseLong(source.substring(sharpIndex + 1));
}
}
long timestamp = 0; // NOTE: 默认值为 0
if (dotIndex != -1 && dotIndex > colonIndex) {
timestamp = Long.parseLong(source.substring(dotIndex + 1));
}
return new BinlogPosition(binlogSuffix, binlogPosition, // NL
masterId,
timestamp);
}
public String getFilePattern() {
final int index = fileName.indexOf('.');
if (index != -1) {
return fileName.substring(0, index);
}
return null;
}
public void setFilePattern(String filePattern) {
// We tolerate the event ID with or without the binlog prefix.
if (fileName.indexOf('.') < 0) {
fileName = filePattern + '.' + fileName;
}
}
public long getMasterId() {
return masterId;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return format2String(10);
}
}
......@@ -36,6 +36,7 @@ canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
......@@ -59,6 +60,8 @@ canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
......
......@@ -3,14 +3,14 @@
canal.instance.mysql.slaveId=1234
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000004
canal.instance.master.position=25678
canal.instance.master.timestamp=1506088042
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=68240
canal.instance.master.timestamp=
# tsdb info
# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/tsdb
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
......@@ -25,7 +25,7 @@ canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test
canal.instance.connectionCharset=UTF-8
# table regex
canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=test\\..*
# table black regex
canal.instance.filter.black.regex=tsdb\\..*
canal.instance.filter.black.regex=canal_tsdb\\..*
#################################################
\ No newline at end of file
......@@ -178,9 +178,14 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
<!--表结构相关-->
<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
</bean>
</beans>
......@@ -163,48 +163,14 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
<!--表结构相关-->
<property name="tableMetaManager">
<bean class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
</bean>
</property>
</bean>
<!--tsdb related-->
<bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
factory-method="getDataSource">
<constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
<constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
<constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
<constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="txTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="transactionManager"></property>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"></property>
</bean>
<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="configLocation" value="classpath:sqlmap-config.xml"/>
</bean>
<bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
<property name="sqlMapClient" ref="sqlMapClient"/>
</bean>
<bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
<property name="sqlMapClient" ref="sqlMapClient"/>
<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
</bean>
</beans>
......@@ -160,6 +160,7 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
......@@ -250,6 +251,7 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
......
......@@ -155,9 +155,14 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="needWait" value="${canal.instance.parser.needWait:true}"/>
<property name="directory" value="${canal.instance.parser.directory}"/>
<!--表结构相关-->
<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
</bean>
</beans>
......@@ -151,9 +151,14 @@
<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
<property name="filterQueryDcl" value="${canal.instance.filter.query.dcl:false}" />
<property name="filterQueryDdl" value="${canal.instance.filter.query.ddl:false}" />
<property name="useDruidDdlFilter" value="${canal.instance.filter.druid.ddl:true}" />
<property name="filterRows" value="${canal.instance.filter.rows:false}" />
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />
<!--表结构相关-->
<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"/>
<dataSource type="POOLED">
<property name="driver" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/tsdb"/>
<property name="username" value="canal"/>
<property name="password" value="canal"/>
</dataSource>
</environment>
</environments>
<mappers>
<package name="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.mapper"/>
</mappers>
</configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
default-autowire="byName">
<!-- properties -->
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
<property name="ignoreResourceNotFound" value="true" />
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
<property name="locationNames">
<list>
<value>classpath:canal.properties</value>
<value>classpath:${canal.instance.destination:}/instance.properties</value>
</list>
</property>
</bean>
<!-- 基于db的实现 -->
<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
</bean>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="${canal.instance.tsdb.url:}" />
<property name="username" value="${canal.instance.tsdb.dbUsername:canal}" />
<property name="password" value="${canal.instance.tsdb.dbPassword:canal}" />
<property name="maxActive" value="30" />
<property name="initialSize" value="0" />
<property name="minIdle" value="1" />
<property name="maxWait" value="10000" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 1" />
<property name="exceptionSorterClassName" value="com.alibaba.druid.pool.vendor.MySqlExceptionSorter" />
<property name="validConnectionCheckerClassName" value="com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="useUnfairLock" value="true" />
</bean>
<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="configLocation" value="classpath:spring/tsdb/sql-map/sqlmap-config.xml"/>
</bean>
<bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
<property name="sqlMapClient" ref="sqlMapClient"/>
</bean>
<bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
<property name="sqlMapClient" ref="sqlMapClient"/>
</bean>
</beans>
......@@ -3,7 +3,6 @@
"http://www.ibatis.com/dtd/sql-map-config-2.dtd">
<sqlMapConfig>
<settings useStatementNamespaces="true"/>
<sqlMap resource="sql-map/sqlmap_history.xml"/>
<sqlMap resource="sql-map/sqlmap_snapshot.xml"/>
<sqlMap resource="spring/tsdb/sql-map/sqlmap_history.xml"/>
<sqlMap resource="spring/tsdb/sql-map/sqlmap_snapshot.xml"/>
</sqlMapConfig>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd" >
<sqlMap namespace="table_meta_history">
<typeAlias alias="metaHistoryDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO"/>
<sqlMap namespace="meta_history">
<typeAlias alias="metaHistoryDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO"/>
<sql id="allColumns">
<![CDATA[
gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,`schema`,`table`,`sql`,`type`,`extra`
gmt_create,gmt_modified,destination,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,sql_schema,sql_table,sql_text,sql_type,extra
]]>
</sql>
<sql id="allVOColumns">
<![CDATA[
a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
a.use_schema as useSchema,a.`schema` as `schema`,a.`table` as `table`,a.`sql` as `sql`,a.`type` as `type`,a.`extra` as `extra`
a.destination as destination,a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
a.use_schema as useSchema,a.sql_schema as sqlSchema,a.sql_table as sqlTable,a.sql_text as sqlText,a.sql_type as sqlType,a.extra as extra
]]>
</sql>
<select id="findByTimestamp" parameterClass="java.util.Map" resultClass="metaHistoryDO">
select
<include refid="allVOColumns"/>
from `canal_table_meta_history$env$` a
from meta_history a
<![CDATA[
where binlog_timestamp >= #snapshotTimestamp# and binlog_timestamp <= #timestamp#
where destination = #destination# and binlog_timestamp >= #snapshotTimestamp# and binlog_timestamp <= #timestamp#
order by binlog_timestamp asc,id asc
]]>
</select>
<insert id="insert" parameterClass="metaHistoryDO">
insert into `canal_table_meta_history` (<include refid="allColumns"/>)
values(now(),now(),#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#useSchema#,#schema#,#table#,#sql#,#type#,#extra#);
<selectKey resultClass="java.lang.Long" keyProperty="id">
SELECT last_insert_id()
</selectKey>
insert into meta_history (<include refid="allColumns"/>)
values(CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,#destination#,#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#useSchema#,#sqlSchema#,#sqlTable#,#sqlText#,#sqlType#,#extra#)
</insert>
<delete id="deleteByName" parameterClass="java.util.Map">
delete from meta_history
where destination=#destination#
</delete>
<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<![CDATA[
delete from `canal_table_meta_history`
where gmt_modified < date_sub(now(),interval #interval# second)
delete from meta_history
where gmt_modified < timestamp(#timestamp#)
]]>
</delete>
<select id="getAll" resultClass="metaHistoryDO">
select * from canal_table_meta_history
</select>
</sqlMap>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd" >
<sqlMap namespace="table_meta_snapshot">
<typeAlias alias="metaSnapshotDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO"/>
<sqlMap namespace="meta_snapshot">
<typeAlias alias="metaSnapshotDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO"/>
<typeAlias alias="tableMetaSnapshotDO"
type="com.alibaba.middleware.jingwei.biz.dataobject.CanalTableMetaSnapshotDO"/>
<sql id="allColumns">
<![CDATA[
gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,data,extra
gmt_create,gmt_modified,destination,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,data,extra
]]>
</sql>
<sql id="allVOColumns">
<![CDATA[
a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMaster_id,a.binlog_timestamp as binlogTimestamp,a.data as data,a.extra as extra
a.destination as destination,a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,a.data as data,a.extra as extra
]]>
</sql>
<select id="findByTimestamp" parameterClass="java.util.Map" resultClass="metaSnapshotDO">
select
<include refid="allVOColumns"/>
from `canal_table_meta_snapshot$env$` a
<![CDATA[
where binlog_timestamp < #timestamp#
select <include refid="allVOColumns"/>
<![CDATA[
from meta_snapshot a
where destination = #destination# and binlog_timestamp < #timestamp#
order by binlog_timestamp desc,id desc
limit 1
]]>
</select>
<insert id="insert" parameterClass="metaSnapshotDO">
insert into `canal_table_meta_snapshot` (<include refid="allColumns"/>)
values(now(),now(),#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#data#,#extra#);
<selectKey resultClass="java.lang.Long" keyProperty="id">
SELECT last_insert_id()
</selectKey>
insert into meta_snapshot (<include refid="allColumns"/>)
values(CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,#destination#,#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#data#,#extra#)
</insert>
<update id="update" parameterClass="metaSnapshotDO">
update `canal_table_meta_snapshot` set gmt_modified=now(),
update meta_snapshot set gmt_modified=now(),
binlog_file=#binlogFile#,binlog_offest=#binlogOffest#,binlog_master_id=#binlogMasterId#,binlog_timestamp=#binlogTimestamp#,data=#data#,extra=#extra#
where binlog_timestamp=0
where destination=#destination# and binlog_timestamp=0
</update>
<delete id="deleteByName" parameterClass="java.util.Map">
delete from meta_snapshot
where destination=#destination#
</delete>
<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<![CDATA[
delete from `canal_table_meta_snapshot`
where gmt_modified < date_sub(now(),interval #interval# second)
delete from meta_snapshot
where gmt_modified < timestamp(#timestamp#)
]]>
</delete>
<select id="getAll" resultClass="metaSnapshotDO">
select * from canal_table_meta_snapshot
</select>
</sqlMap>
\ No newline at end of file
CREATE TABLE IF NOT EXISTS `meta_snapshot` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
`binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
`binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
`binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
`binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
`data` longtext DEFAULT NULL COMMENT '表结构数据',
`extra` text DEFAULT NULL COMMENT '额外的扩展信息',
PRIMARY KEY (`id`),
UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
KEY `destination` (`destination`),
KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构记录表快照表';
CREATE TABLE IF NOT EXISTS `meta_history` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
`binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
`binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
`binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
`binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
`use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
`schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
`table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
`sql` longtext DEFAULT NULL COMMENT '执行的sql',
`type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
`extra` text DEFAULT NULL COMMENT '额外的扩展信息',
PRIMARY KEY (`id`),
UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
KEY `destination` (`destination`),
KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构变化明细表';
\ No newline at end of file
......@@ -35,21 +35,24 @@ public class SocketChannel {
}
public void writeChannel(byte[]... buf) throws IOException {
if (channel != null && channel.isWritable()) channel.writeAndFlush(Unpooled.copiedBuffer(buf));
else throw new IOException("write failed ! please checking !");
if (channel != null && channel.isWritable()) {
channel.writeAndFlush(Unpooled.copiedBuffer(buf));
} else {
throw new IOException("write failed ! please checking !");
}
}
public byte[] read(int readSize) throws IOException {
do {
if (readSize > cache.readableBytes()) {
if (null == channel) {
throw new IOException("socket has Interrupted !");
throw new java.nio.channels.ClosedByInterruptException();
}
synchronized (this) {
try {
wait(100);
} catch (InterruptedException e) {
throw new IOException("socket has Interrupted !");
throw new java.nio.channels.ClosedByInterruptException();
}
}
} else {
......
......@@ -367,7 +367,7 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
this.dataDir = new File(dataDir);
}
public void setDataDir(File dataDir) {
public void setDataDirByFile(File dataDir) {
this.dataDir = dataDir;
}
......
......@@ -31,7 +31,7 @@ public class FileMixedMetaManagerTest extends AbstractMetaManagerTest {
@Test
public void testSubscribeAll() {
FileMixedMetaManager metaManager = new FileMixedMetaManager();
metaManager.setDataDir(dataDir);
metaManager.setDataDirByFile(dataDir);
metaManager.setPeriod(100);
metaManager.start();
......@@ -40,7 +40,7 @@ public class FileMixedMetaManagerTest extends AbstractMetaManagerTest {
sleep(2000L);
// 重新构建一次,能获得上一次zk上的记录
FileMixedMetaManager metaManager2 = new FileMixedMetaManager();
metaManager2.setDataDir(dataDir);
metaManager2.setDataDirByFile(dataDir);
metaManager2.setPeriod(100);
metaManager2.start();
......@@ -52,7 +52,7 @@ public class FileMixedMetaManagerTest extends AbstractMetaManagerTest {
@Test
public void testBatchAll() {
FileMixedMetaManager metaManager = new FileMixedMetaManager();
metaManager.setDataDir(dataDir);
metaManager.setDataDirByFile(dataDir);
metaManager.setPeriod(100);
metaManager.start();
......@@ -67,7 +67,7 @@ public class FileMixedMetaManagerTest extends AbstractMetaManagerTest {
@Test
public void testCursorAll() {
FileMixedMetaManager metaManager = new FileMixedMetaManager();
metaManager.setDataDir(dataDir);
metaManager.setDataDirByFile(dataDir);
metaManager.setPeriod(100);
metaManager.start();
......@@ -76,7 +76,7 @@ public class FileMixedMetaManagerTest extends AbstractMetaManagerTest {
sleep(1000L);
// 重新构建一次,能获得上一次zk上的记录
FileMixedMetaManager metaManager2 = new FileMixedMetaManager();
metaManager2.setDataDir(dataDir);
metaManager2.setDataDirByFile(dataDir);
metaManager2.setPeriod(100);
metaManager2.start();
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.otter</groupId>
......@@ -56,7 +57,6 @@
<dependency>
<groupId>org.apache.ibatis</groupId>
<artifactId>ibatis-sqlmap</artifactId>
<version>2.3.4.726</version>
</dependency>
<!-- test dependency -->
<dependency>
......
......@@ -96,6 +96,10 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
protected void preDump(ErosaConnection connection) {
}
protected boolean processTableMeta(EntryPosition position) {
return true;
}
protected void afterDump(ErosaConnection connection) {
}
......@@ -145,7 +149,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
ErosaConnection erosaConnection = null;
while (running) {
try {
// 开始执行replication
// 1. 构造Erosa连接
erosaConnection = buildErosaConnection();
......@@ -163,6 +166,11 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
if (startPosition == null) {
throw new CanalParseException("can't find start position for " + destination);
}
if (!processTableMeta(startPosition)) {
throw new CanalParseException("can't find init table meta for " + destination
+ " with position : " + startPosition);
}
logger.info("find start position : {}", startPosition.toString());
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();
......
......@@ -2,24 +2,27 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
import java.nio.charset.Charset;
import javax.annotation.Resource;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.filter.CanalEventFilter;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
import com.alibaba.otter.canal.parse.inbound.BinlogParser;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBBuilder;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
public abstract class AbstractMysqlEventParser extends AbstractEventParser {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final long BINLOG_START_OFFEST = 4L;
protected boolean enableTsdb = false;
protected String tsdbSpringXml;
protected TableMetaTSDB tableMetaTSDB;
// 编码信息
protected byte connectionCharsetNumber = (byte) 33;
protected Charset connectionCharset = Charset.forName("UTF-8");
......@@ -28,9 +31,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
@Resource
protected TableMetaManager tableMetaManager;
protected boolean useDruidDdlFilter = true;
protected BinlogParser buildParser() {
......@@ -49,9 +49,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
convert.setFilterQueryDdl(filterQueryDdl);
convert.setFilterRows(filterRows);
convert.setFilterTableError(filterTableError);
// 初始化parser的时候也初始化管理mysql 表结构的管理器
tableMetaManager.init();
return convert;
}
......@@ -70,14 +67,34 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
* @param position
* @return
*/
protected boolean processTableMeta(BinlogPosition position) {
if (tableMetaManager != null) {
return tableMetaManager.rollback(position);
protected boolean processTableMeta(EntryPosition position) {
if (tableMetaTSDB != null) {
return tableMetaTSDB.rollback(position);
}
return true;
}
public void start() throws CanalParseException {
if (enableTsdb) {
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
}
}
super.start();
}
public void stop() throws CanalParseException {
if (enableTsdb) {
TableMetaTSDBBuilder.destory(destination);
tableMetaTSDB = null;
}
super.stop();
}
public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
super.setEventBlackFilter(eventBlackFilter);
......@@ -122,14 +139,6 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
this.filterTableError = filterTableError;
}
public TableMetaManager getTableMetaManager() {
return tableMetaManager;
}
public void setTableMetaManager(TableMetaManager tableMetaManager) {
this.tableMetaManager = tableMetaManager;
}
public boolean isUseDruidDdlFilter() {
return useDruidDdlFilter;
}
......@@ -137,4 +146,24 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
public void setUseDruidDdlFilter(boolean useDruidDdlFilter) {
this.useDruidDdlFilter = useDruidDdlFilter;
}
public void setEnableTsdb(boolean enableTsdb) {
this.enableTsdb = enableTsdb;
if (this.enableTsdb) {
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
}
}
}
public void setTsdbSpringXml(String tsdbSpringXml) {
this.tsdbSpringXml = tsdbSpringXml;
if (tableMetaTSDB == null) {
// 初始化
tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
}
}
}
......@@ -50,7 +50,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
throw new CanalParseException(e);
}
tableMetaCache = new TableMetaCache(metaConnection, tableMetaManager);
tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
}
......
......@@ -26,6 +26,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogFormat;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.BinlogImage;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
......@@ -117,12 +118,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
}
}
if (tableMetaManager != null) {
tableMetaManager.setConnection(metaConnection);
tableMetaManager.setFilter(eventFilter);
if (tableMetaTSDB != null && tableMetaTSDB instanceof TableMetaManager) {
((TableMetaManager) tableMetaTSDB).setConnection(metaConnection);
((TableMetaManager) tableMetaTSDB).setFilter(eventFilter);
((TableMetaManager) tableMetaTSDB).setBlackFilter(eventBlackFilter);
}
tableMetaCache = new TableMetaCache(metaConnection, tableMetaManager);
tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
}
}
......@@ -337,6 +339,34 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
return endPosition;
}
protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
final EntryPosition endPosition = findEndPosition(mysqlConnection);
if (tableMetaTSDB != null) {
long startTimestamp = System.currentTimeMillis();
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
endPosition.getJournalName());
} else {
return endPosition;
}
}
protected EntryPosition findPositionWithMasterIdAndTimestamp(MysqlConnection connection, EntryPosition fixedPosition) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
if (tableMetaTSDB != null && (fixedPosition.getTimestamp() == null || fixedPosition.getTimestamp() <= 0)) {
// 使用一个未来极大的时间,基于位点进行定位
long startTimestamp = System.currentTimeMillis() + 102L * 365 * 24 * 3600 * 1000; // 当前时间的未来102年
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
fixedPosition,
fixedPosition.getJournalName());
} else {
return fixedPosition;
}
}
protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
......@@ -350,7 +380,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
}
if (entryPosition == null) {
entryPosition = findEndPosition(mysqlConnection); // 默认从当前最后一个位置进行消费
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
// 判断一下是否需要按时间订阅
......@@ -362,13 +392,15 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
logger.warn("prepare to find start position just show master status");
return findEndPosition(mysqlConnection); // 默认从当前最后一个位置进行消费
return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
} else {
if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
// 如果指定binlogName + offest,直接返回
entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(), "" });
new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
entryPosition.getTimestamp() });
return entryPosition;
} else {
EntryPosition specificLogFilePosition = null;
......@@ -695,6 +727,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
String logfilename = entry.getHeader().getLogfileName();
Long logfileoffset = entry.getHeader().getLogfileOffset();
Long logposTimestamp = entry.getHeader().getExecuteTime();
Long serverId = entry.getHeader().getServerId();
if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
|| CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
......@@ -715,13 +748,13 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
// position = current +
// data.length,代表该事务的下一条offest,避免多余的事务重复
if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
// 当前事务开始位点
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
......
......@@ -35,8 +35,8 @@ import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.CanalEntry.Type;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.google.protobuf.ByteString;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import com.taobao.tddl.dbsync.binlog.LogEvent;
import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
......@@ -189,8 +189,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
if (!isSeek) {
// 使用新的表结构元数据管理方式
BinlogPosition position = createPosition(event.getHeader());
tableMetaCache.apply(position, event.getDbName(), queryString);
EntryPosition position = createPosition(event.getHeader());
tableMetaCache.apply(position, event.getDbName(), queryString, null);
}
Header header = createHeader(binlogFileName, event.getHeader(), schemaName, tableName, type);
......@@ -386,7 +386,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
table.getDbName(),
table.getTableName(),
eventType);
BinlogPosition position = createPosition(event.getHeader());
EntryPosition position = createPosition(event.getHeader());
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setTableId(event.getTableId());
rowChangeBuider.setIsDdl(false);
......@@ -444,8 +444,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
}
}
private BinlogPosition createPosition(LogHeader logHeader) {
return new BinlogPosition(binlogFileName, logHeader.getLogPos(), logHeader.getServerId(), logHeader.getWhen()); // 记录到秒
private EntryPosition createPosition(LogHeader logHeader) {
return new EntryPosition(binlogFileName,
logHeader.getLogPos(),
logHeader.getWhen() * 1000L,
logHeader.getServerId()); // 记录到秒
}
private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
......@@ -468,7 +471,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
}
}
BinlogPosition position = createPosition(event.getHeader());
EntryPosition position = createPosition(event.getHeader());
if (!existRDSNoPrimaryKey) {
// online ddl增加字段操作步骤:
// 1. 新增一张临时表,将需要做ddl表的数据全量导入
......@@ -716,7 +719,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
return true;
}
private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, BinlogPosition position) {
private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, EntryPosition position) {
try {
return tableMetaCache.getTableMeta(dbName, tbName, useCache, position);
} catch (Exception e) {
......
......@@ -14,11 +14,11 @@ import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
/**
* 处理table meta解析和缓存
......@@ -37,15 +37,15 @@ public class TableMetaCache {
private MysqlConnection connection;
private boolean isOnRDS = false;
private TableMetaManager tableMetaManager;
private TableMetaTSDB tableMetaTSDB;
// 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
private LoadingCache<String, TableMeta> tableMetaDB;
public TableMetaCache(MysqlConnection con, TableMetaManager tableMetaManager){
public TableMetaCache(MysqlConnection con, TableMetaTSDB tableMetaTSDB){
this.connection = con;
this.tableMetaManager = tableMetaManager;
this.tableMetaTSDB = tableMetaTSDB;
// 如果持久存储的表结构为空,从db里面获取下
if (tableMetaManager == null) {
if (tableMetaTSDB == null) {
this.tableMetaDB = CacheBuilder.newBuilder().build(new CacheLoader<String, TableMeta>() {
@Override
......@@ -124,14 +124,14 @@ public class TableMetaCache {
return tableMetaDB.getUnchecked(getFullName(schema, table));
}
public TableMeta getTableMeta(String schema, String table, BinlogPosition position) {
public TableMeta getTableMeta(String schema, String table, EntryPosition position) {
return getTableMeta(schema, table, true, position);
}
public TableMeta getTableMeta(String schema, String table, boolean useCache, BinlogPosition position) {
public TableMeta getTableMeta(String schema, String table, boolean useCache, EntryPosition position) {
TableMeta tableMeta = null;
if (tableMetaManager != null) {
tableMeta = tableMetaManager.find(schema, table);
if (tableMetaTSDB != null) {
tableMeta = tableMetaTSDB.find(schema, table);
if (tableMeta == null) {
// 因为条件变化,可能第一次的tableMeta没取到,需要从db获取一次,并记录到snapshot中
String fullName = getFullName(schema, table);
......@@ -142,8 +142,8 @@ public class TableMetaCache {
createDDL = packet.getFieldValues().get(1);
}
// 强制覆盖掉内存值
tableMetaManager.apply(position, schema, createDDL);
tableMeta = tableMetaManager.find(schema, table);
tableMetaTSDB.apply(position, schema, createDDL, "first");
tableMeta = tableMetaTSDB.find(schema, table);
} catch (IOException e) {
throw new CanalParseException("fetch failed by table meta:" + fullName, e);
}
......@@ -159,7 +159,7 @@ public class TableMetaCache {
}
public void clearTableMeta(String schema, String table) {
if (tableMetaManager != null) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
tableMetaDB.invalidate(getFullName(schema, table));
......@@ -167,7 +167,7 @@ public class TableMetaCache {
}
public void clearTableMetaWithSchemaName(String schema) {
if (tableMetaManager != null) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
for (String name : tableMetaDB.asMap().keySet()) {
......@@ -180,7 +180,7 @@ public class TableMetaCache {
}
public void clearTableMeta() {
if (tableMetaManager != null) {
if (tableMetaTSDB != null) {
// tsdb不需要做,会基于ddl sql自动清理
} else {
tableMetaDB.invalidateAll();
......@@ -195,9 +195,9 @@ public class TableMetaCache {
* @param ddl
* @return
*/
public boolean apply(BinlogPosition position, String schema, String ddl) {
if (tableMetaManager != null) {
return tableMetaManager.apply(position, schema, ddl);
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
if (tableMetaTSDB != null) {
return tableMetaTSDB.apply(position, schema, ddl, extra);
} else {
// ignore
return true;
......
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
import javax.sql.DataSource;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.vendor.MySqlExceptionSorter;
import com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
/**
* Created by wanshao Date: 2017/9/22 Time: 下午2:46
**/
public class DataSourceFactoryTSDB {
public static DataSource getDataSource(String url, String userName, String password) {
try {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(url);
druidDataSource.setUsername(userName);
druidDataSource.setPassword(password);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setNotFullTimeoutRetryCount(2);
druidDataSource.setValidConnectionCheckerClassName(MySqlValidConnectionChecker.class.getName());
druidDataSource.setExceptionSorterClassName(MySqlExceptionSorter.class.getName());
druidDataSource.setValidationQuery("SELECT 1");
druidDataSource.setInitialSize(1);
druidDataSource.setMinIdle(1);
druidDataSource.setMaxActive(30);
druidDataSource.setMaxWait(10 * 1000);
druidDataSource.setTimeBetweenEvictionRunsMillis(60 * 1000);
druidDataSource.setMinEvictableIdleTimeMillis(50 * 1000);
druidDataSource.setUseUnfairLock(true);
druidDataSource.init();
return druidDataSource;
} catch (Throwable e) {
throw new CanalParseException("init druidDataSource failed", e);
}
}
}
......@@ -6,14 +6,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.druid.sql.ast.SQLDataType;
import com.alibaba.druid.sql.ast.SQLDataTypeImpl;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLNullExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
......@@ -30,11 +31,10 @@ import com.alibaba.druid.sql.repository.Schema;
import com.alibaba.druid.sql.repository.SchemaObject;
import com.alibaba.druid.sql.repository.SchemaRepository;
import com.alibaba.druid.util.JdbcConstants;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
/**
* 基于DDL维护的内存表结构
......@@ -44,15 +44,19 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB;
*/
public class MemoryTableMeta implements TableMetaTSDB {
private Logger logger = LoggerFactory.getLogger(MemoryTableMeta.class);
private Map<List<String>, TableMeta> tableMetas = new ConcurrentHashMap<List<String>, TableMeta>();
private SchemaRepository repository = new SchemaRepository(JdbcConstants.MYSQL);
private Logger logger = LoggerFactory.getLogger(MemoryTableMeta.class);
public MemoryTableMeta(Logger logger){
this.logger = logger;
public MemoryTableMeta(){
}
@Override
public boolean init(String destination) {
return true;
}
public boolean apply(BinlogPosition position, String schema, String ddl) {
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
tableMetas.clear();
synchronized (this) {
if (StringUtils.isNotEmpty(schema)) {
......@@ -116,7 +120,7 @@ public class MemoryTableMeta implements TableMetaTSDB {
}
@Override
public boolean rollback(BinlogPosition position) {
public boolean rollback(EntryPosition position) {
throw new RuntimeException("not support for memory");
}
......@@ -169,6 +173,17 @@ public class MemoryTableMeta implements TableMetaTSDB {
dataTypStr += ")";
}
if (dataType instanceof SQLDataTypeImpl) {
SQLDataTypeImpl dataTypeImpl = (SQLDataTypeImpl) dataType;
if (dataTypeImpl.isUnsigned()) {
dataTypStr += " unsigned";
}
if (dataTypeImpl.isZerofill()) {
dataTypStr += " zerofill";
}
}
if (column.getDefaultExpr() == null || column.getDefaultExpr() instanceof SQLNullExpr) {
fieldMeta.setDefaultValue(null);
} else {
......@@ -211,6 +226,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
+ DruidDdlParser.unescapeName(((SQLPropertyExpr) sqlName).getName());
} else if (sqlName instanceof SQLIdentifierExpr) {
return DruidDdlParser.unescapeName(((SQLIdentifierExpr) sqlName).getName());
} else if (sqlName instanceof SQLCharExpr) {
return ((SQLCharExpr) sqlName).getText();
} else {
return sqlName.toString();
}
......
......@@ -11,13 +11,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import com.alibaba.druid.sql.repository.Schema;
import com.alibaba.fastjson.JSON;
......@@ -32,10 +31,10 @@ import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.TableMetaCache;
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DdlResult;
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
/**
* 基于console远程管理 see internal class: CanalTableMeta , ConsoleTableMetaTSDB
......@@ -45,23 +44,27 @@ import com.taobao.tddl.dbsync.binlog.BinlogPosition;
*/
public class TableMetaManager implements TableMetaTSDB {
private static Logger logger = LoggerFactory.getLogger(TableMetaManager.class);
private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
private static final BinlogPosition INIT_POSITION = BinlogPosition.parseFromString("0:0#-2.-1");
private MemoryTableMeta memoryTableMeta;
private MysqlConnection connection; // 查询meta信息的链接
private CanalEventFilter filter;
private BinlogPosition lastPosition;
private ScheduledExecutorService scheduler;
private static Logger logger = LoggerFactory.getLogger(TableMetaManager.class);
private static Pattern pattern = Pattern.compile("Duplicate entry '.*' for key '*'");
private static final EntryPosition INIT_POSITION = new EntryPosition("0", 0L, -2L, -1L);
private String destination;
private MemoryTableMeta memoryTableMeta;
private MysqlConnection connection; // 查询meta信息的链接
private CanalEventFilter filter;
private CanalEventFilter blackFilter;
private EntryPosition lastPosition;
private ScheduledExecutorService scheduler;
private MetaHistoryDAO metaHistoryDAO;
private MetaSnapshotDAO metaSnapshotDAO;
@Resource
private MetaHistoryDAO metaHistoryDAO;
public TableMetaManager(){
@Resource
private MetaSnapshotDAO metaSnapshotDAO;
}
public void init() {
this.memoryTableMeta = new MemoryTableMeta(logger);
@Override
public boolean init(final String destination) {
this.destination = destination;
this.memoryTableMeta = new MemoryTableMeta();
this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
......@@ -76,17 +79,14 @@ public class TableMetaManager implements TableMetaTSDB {
@Override
public void run() {
try {
logger.info("-------- begin to produce snapshot for table meta");
MDC.put("destination", destination);
applySnapshotToDB(lastPosition, false);
} catch (Throwable e) {
logger.error("scheudle faield", e);
logger.error("scheudle applySnapshotToDB faield", e);
}
}
}, 24, 24, TimeUnit.SECONDS);
}
public TableMetaManager(){
}, 24, 24, TimeUnit.HOURS);
return true;
}
@Override
......@@ -97,13 +97,13 @@ public class TableMetaManager implements TableMetaTSDB {
}
@Override
public boolean apply(BinlogPosition position, String schema, String ddl) {
public boolean apply(EntryPosition position, String schema, String ddl, String extra) {
// 首先记录到内存结构
synchronized (memoryTableMeta) {
if (memoryTableMeta.apply(position, schema, ddl)) {
if (memoryTableMeta.apply(position, schema, ddl, extra)) {
this.lastPosition = position;
// 同步每次变更给远程做历史记录
return applyHistoryToDB(position, schema, ddl);
return applyHistoryToDB(position, schema, ddl, extra);
} else {
throw new RuntimeException("apply to memory is failed");
}
......@@ -111,11 +111,11 @@ public class TableMetaManager implements TableMetaTSDB {
}
@Override
public boolean rollback(BinlogPosition position) {
public boolean rollback(EntryPosition position) {
// 每次rollback需要重新构建一次memory data
this.memoryTableMeta = new MemoryTableMeta(logger);
this.memoryTableMeta = new MemoryTableMeta();
boolean flag = false;
BinlogPosition snapshotPosition = buildMemFromSnapshot(position);
EntryPosition snapshotPosition = buildMemFromSnapshot(position);
if (snapshotPosition != null) {
applyHistoryOnMemory(snapshotPosition, position);
flag = true;
......@@ -145,20 +145,25 @@ public class TableMetaManager implements TableMetaTSDB {
ResultSetPacket packet = connection.query("show databases");
List<String> schemas = new ArrayList<String>();
for (String schema : packet.getFieldValues()) {
if (!filter.filter(schema)) {
schemas.add(schema);
}
schemas.add(schema);
}
for (String schema : schemas) {
packet = connection.query("show tables from `" + schema + "`");
List<String> tables = new ArrayList<String>();
for (String table : packet.getFieldValues()) {
if (!filter.filter(table)) {
tables.add(table);
String fullName = schema + "." + table;
if (blackFilter == null || !blackFilter.filter(fullName)) {
if (filter == null || filter.filter(fullName)) {
tables.add(table);
}
}
}
if (tables.isEmpty()) {
continue;
}
StringBuilder sql = new StringBuilder();
for (String table : tables) {
sql.append("show create table `" + schema + "`.`" + table + "`;");
......@@ -168,7 +173,7 @@ public class TableMetaManager implements TableMetaTSDB {
for (ResultSetPacket onePacket : packets) {
if (onePacket.getFieldValues().size() > 1) {
String oneTableCreateSql = onePacket.getFieldValues().get(1);
memoryTableMeta.apply(INIT_POSITION, schema, oneTableCreateSql);
memoryTableMeta.apply(INIT_POSITION, schema, oneTableCreateSql, null);
}
}
}
......@@ -179,25 +184,26 @@ public class TableMetaManager implements TableMetaTSDB {
}
}
private boolean applyHistoryToDB(BinlogPosition position, String schema, String ddl) {
private boolean applyHistoryToDB(EntryPosition position, String schema, String ddl, String extra) {
Map<String, String> content = new HashMap<String, String>();
content.put("binlogFile", position.getFileName());
content.put("destination", destination);
content.put("binlogFile", position.getJournalName());
content.put("binlogOffest", String.valueOf(position.getPosition()));
content.put("binlogMasterId", String.valueOf(position.getMasterId()));
content.put("binlogMasterId", String.valueOf(position.getServerId()));
content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
content.put("useSchema", schema);
if (content.isEmpty()) {
throw new RuntimeException("apply failed caused by content is empty in applyHistoryToDB");
}
// 待补充
List<DdlResult> ddlResults = DruidDdlParser.parse(schema, ddl);
List<DdlResult> ddlResults = DruidDdlParser.parse(ddl, schema);
if (ddlResults.size() > 0) {
DdlResult ddlResult = ddlResults.get(0);
content.put("schema", ddlResult.getSchemaName());
content.put("table", ddlResult.getTableName());
content.put("type", ddlResult.getType().name());
content.put("sql", ddl);
// content.put("extra", "");
content.put("sqlSchema", ddlResult.getSchemaName());
content.put("sqlTable", ddlResult.getTableName());
content.put("sqlType", ddlResult.getType().name());
content.put("sqlText", ddl);
content.put("extra", extra);
}
MetaHistoryDO metaDO = new MetaHistoryDO();
......@@ -212,7 +218,7 @@ public class TableMetaManager implements TableMetaTSDB {
// 忽略掉重复的位点
logger.warn("dup apply for sql : " + ddl);
} else {
throw new CanalParseException("apply history to db failed caused by : " + e.getMessage());
throw new CanalParseException("apply history to db failed caused by : " + e.getMessage(), e);
}
}
......@@ -222,9 +228,9 @@ public class TableMetaManager implements TableMetaTSDB {
/**
* 发布数据到console上
*/
private boolean applySnapshotToDB(BinlogPosition position, boolean init) {
private boolean applySnapshotToDB(EntryPosition position, boolean init) {
// 获取一份快照
MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta(logger);
MemoryTableMeta tmpMemoryTableMeta = new MemoryTableMeta();
Map<String, String> schemaDdls = null;
synchronized (memoryTableMeta) {
if (!init && position == null) {
......@@ -233,7 +239,7 @@ public class TableMetaManager implements TableMetaTSDB {
}
schemaDdls = memoryTableMeta.snapshot();
for (Map.Entry<String, String> entry : schemaDdls.entrySet()) {
tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue());
tmpMemoryTableMeta.apply(position, entry.getKey(), entry.getValue(), null);
}
}
......@@ -248,9 +254,10 @@ public class TableMetaManager implements TableMetaTSDB {
}
if (compareAll) {
Map<String, String> content = new HashMap<String, String>();
content.put("binlogFile", position.getFileName());
content.put("destination", destination);
content.put("binlogFile", position.getJournalName());
content.put("binlogOffest", String.valueOf(position.getPosition()));
content.put("binlogMasterId", String.valueOf(position.getMasterId()));
content.put("binlogMasterId", String.valueOf(position.getServerId()));
content.put("binlogTimestamp", String.valueOf(position.getTimestamp()));
content.put("data", JSON.toJSONString(schemaDdls));
if (content.isEmpty()) {
......@@ -264,9 +271,9 @@ public class TableMetaManager implements TableMetaTSDB {
} catch (Throwable e) {
if (isUkDuplicateException(e)) {
// 忽略掉重复的位点
logger.warn("dup apply snapshot for data : " + snapshotDO.getData());
logger.info("dup apply snapshot use position : " + position + " , just ignore");
} else {
throw new CanalParseException("apply failed caused by : " + e.getMessage());
throw new CanalParseException("apply failed caused by : " + e.getMessage(), e);
}
}
return true;
......@@ -296,18 +303,21 @@ public class TableMetaManager implements TableMetaTSDB {
return result;
}
private BinlogPosition buildMemFromSnapshot(BinlogPosition position) {
private EntryPosition buildMemFromSnapshot(EntryPosition position) {
try {
MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp(position.getTimestamp());
MetaSnapshotDO snapshotDO = metaSnapshotDAO.findByTimestamp(destination, position.getTimestamp());
if (snapshotDO == null) {
return null;
}
String binlogFile = snapshotDO.getBinlogFile();
Long binlogOffest = snapshotDO.getBinlogOffest();
String binlogMasterId = snapshotDO.getBinlogMasterId();
Long binlogTimestamp = snapshotDO.getBinlogTimestamp();
BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
EntryPosition snapshotPosition = new EntryPosition(binlogFile,
binlogOffest == null ? 0l : binlogOffest,
Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
binlogTimestamp == null ? 0l : binlogTimestamp);
binlogTimestamp == null ? 0l : binlogTimestamp,
Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId));
// data存储为Map<String,String>,每个分库一套建表
String sqlData = snapshotDO.getData();
JSONObject jsonObj = JSON.parseObject(sqlData);
......@@ -315,43 +325,49 @@ public class TableMetaManager implements TableMetaTSDB {
// 记录到内存
if (!memoryTableMeta.apply(snapshotPosition,
ObjectUtils.toString(entry.getKey()),
ObjectUtils.toString(entry.getValue()))) {
ObjectUtils.toString(entry.getValue()),
null)) {
return null;
}
}
return snapshotPosition;
} catch (Throwable e) {
throw new CanalParseException("apply failed caused by : " + e.getMessage());
throw new CanalParseException("apply failed caused by : " + e.getMessage(), e);
}
}
private boolean applyHistoryOnMemory(BinlogPosition position, BinlogPosition rollbackPosition) {
private boolean applyHistoryOnMemory(EntryPosition position, EntryPosition rollbackPosition) {
try {
List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp(position.getTimestamp(),
List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp(destination,
position.getTimestamp(),
rollbackPosition.getTimestamp());
if (metaHistoryDOList == null) {
return true;
}
for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
String binlogFile = metaHistoryDO.getBinlogFile();
Long binlogOffest = metaHistoryDO.getBinlogOffest();
String binlogMasterId = metaHistoryDO.getBinlogMasterId();
Long binlogTimestamp = metaHistoryDO.getBinlogTimestamp();
String useSchema = metaHistoryDO.getUseSchema();
String sqlData = metaHistoryDO.getSql();
BinlogPosition snapshotPosition = new BinlogPosition(binlogFile,
String sqlData = metaHistoryDO.getSqlText();
EntryPosition snapshotPosition = new EntryPosition(binlogFile,
binlogOffest == null ? 0L : binlogOffest,
Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId),
binlogTimestamp == null ? 0L : binlogTimestamp);
binlogTimestamp == null ? 0L : binlogTimestamp,
Long.valueOf(binlogMasterId == null ? "-2" : binlogMasterId));
// 如果是同一秒内,对比一下history的位点,如果比期望的位点要大,忽略之
if (snapshotPosition.getTimestamp() > rollbackPosition.getTimestamp()) {
continue;
} else if (rollbackPosition.getMasterId() == snapshotPosition.getMasterId()
} else if (rollbackPosition.getServerId() == snapshotPosition.getServerId()
&& snapshotPosition.compareTo(rollbackPosition) > 0) {
continue;
}
// 记录到内存
if (!memoryTableMeta.apply(snapshotPosition, useSchema, sqlData)) {
if (!memoryTableMeta.apply(snapshotPosition, useSchema, sqlData, null)) {
return false;
}
......@@ -441,6 +457,10 @@ public class TableMetaManager implements TableMetaTSDB {
this.metaSnapshotDAO = metaSnapshotDAO;
}
public void setBlackFilter(CanalEventFilter blackFilter) {
this.blackFilter = blackFilter;
}
public MysqlConnection getConnection() {
return connection;
}
......
......@@ -3,17 +3,21 @@ package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
import java.util.Map;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
/**
* 表结构的时间序列存储
*
* @author agapple 2017年7月27日 下午4:06:30
* @since 3.2.5
* @since 1.0.25
*/
public interface TableMetaTSDB {
/**
* 初始化
*/
public boolean init(String destination);
/**
* 获取当前的表结构
*/
......@@ -22,12 +26,12 @@ public interface TableMetaTSDB {
/**
* 添加ddl到时间序列库中
*/
public boolean apply(BinlogPosition position, String schema, String ddl);
public boolean apply(EntryPosition position, String schema, String ddl, String extra);
/**
* 回滚到指定位点的表结构
*/
public boolean rollback(BinlogPosition position);
public boolean rollback(EntryPosition position);
/**
* 生成快照内容
......
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.google.common.collect.Maps;
/**
* @author agapple 2017年10月11日 下午8:45:40
* @since 1.0.25
*/
public class TableMetaTSDBBuilder {
protected final static Logger logger = LoggerFactory.getLogger(TableMetaTSDBBuilder.class);
private static ConcurrentMap<String, ClassPathXmlApplicationContext> contexts = Maps.newConcurrentMap();
/**
* 代理一下tableMetaTSDB的获取,使用隔离的spring定义
*/
public static TableMetaTSDB build(String destination, String springXml) {
if (StringUtils.isNotEmpty(springXml)) {
ClassPathXmlApplicationContext applicationContext = contexts.get(destination);
if (applicationContext == null) {
synchronized (contexts) {
if (applicationContext == null) {
applicationContext = new ClassPathXmlApplicationContext(springXml);
contexts.put(destination, applicationContext);
}
}
}
TableMetaTSDB tableMetaTSDB = (TableMetaTSDB) applicationContext.getBean("tableMetaTSDB");
tableMetaTSDB.init(destination);
logger.info("{} init TableMetaTSDB with {}", destination, springXml);
return tableMetaTSDB;
} else {
return null;
}
}
public static void destory(String destination) {
ClassPathXmlApplicationContext context = contexts.remove(destination);
if (context != null) {
logger.info("{} destory TableMetaTSDB", destination);
context.close();
}
}
}
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
import com.google.common.collect.Maps;
/**
......@@ -14,22 +22,25 @@ import com.google.common.collect.Maps;
* @author wanshao 2017年7月27日 下午10:51:55
* @since 3.2.5
*/
@SuppressWarnings("deprecation")
public class MetaHistoryDAO extends SqlMapClientDaoSupport {
public List<MetaHistoryDO> getAll() {
return getSqlMapClientTemplate().queryForList("table_meta_history.getAll");
public Long insert(MetaHistoryDO metaDO) {
return (Long) getSqlMapClientTemplate().insert("meta_history.insert", metaDO);
}
public Long insert(MetaHistoryDO metaDO) {
return (Long) getSqlMapClientTemplate().insert("table_meta_history.insert", metaDO);
public List<MetaHistoryDO> findByTimestamp(String destination, Long snapshotTimestamp, Long timestamp) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("destination", destination);
params.put("snapshotTimestamp", snapshotTimestamp == null ? 0L : snapshotTimestamp);
params.put("timestamp", timestamp == null ? 0L : timestamp);
return (List<MetaHistoryDO>) getSqlMapClientTemplate().queryForList("meta_history.findByTimestamp", params);
}
public List<MetaHistoryDO> findByTimestamp(long snapshotTimestamp, long timestamp) {
public Integer deleteByName(String destination) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("snapshotTimestamp", snapshotTimestamp);
params.put("timestamp", timestamp);
return (List<MetaHistoryDO>) getSqlMapClientTemplate().queryForList("table_meta_history.findByTimestamp",
params);
params.put("destination", destination);
return getSqlMapClientTemplate().delete("meta_history.deleteByName", params);
}
/**
......@@ -37,7 +48,35 @@ public class MetaHistoryDAO extends SqlMapClientDaoSupport {
*/
public Integer deleteByGmtModified(int interval) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("interval", interval);
return getSqlMapClientTemplate().delete("table_meta_history.deleteByGmtModified", params);
long timestamp = System.currentTimeMillis() - interval * 1000;
Date date = new Date(timestamp);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
params.put("timestamp", format.format(date));
return getSqlMapClientTemplate().delete("meta_history.deleteByGmtModified", params);
}
protected void initDao() throws Exception {
Connection conn = null;
InputStream input = null;
try {
DataSource dataSource = getDataSource();
conn = dataSource.getConnection();
input = Thread.currentThread().getContextClassLoader().getResourceAsStream("ddl/mysql/meta_history.sql");
if (input == null) {
return;
}
String sql = StringUtils.join(IOUtils.readLines(input), "\n");
Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
} catch (Throwable e) {
logger.warn("init meta_history failed", e);
} finally {
IOUtils.closeQuietly(input);
if (conn != null) {
conn.close();
}
}
}
}
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model;
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
import java.util.Date;
/**
* @author agapple 2017年7月27日 下午11:09:41
* @since 3.2.5
* @since 1.0.25
*/
public class MetaHistoryDO {
/**
* 主键
*/
private Long id;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
private String destination;
private String binlogFile;
private Long binlogOffest;
private String binlogMasterId;
private Long binlogTimestamp;
private String useSchema;
private String schema;
private String table;
private String sql;
private String type;
private String sqlSchema;
private String sqlTable;
private String sqlText;
private String sqlType;
private String extra;
public Long getId() {
......@@ -98,44 +87,62 @@ public class MetaHistoryDO {
this.useSchema = useSchema;
}
public String getSchema() {
return schema;
public String getExtra() {
return extra;
}
public void setSchema(String schema) {
this.schema = schema;
public void setExtra(String extra) {
this.extra = extra;
}
public String getTable() {
return table;
public String getDestination() {
return destination;
}
public void setTable(String table) {
this.table = table;
public void setDestination(String destination) {
this.destination = destination;
}
public String getSql() {
return sql;
public String getSqlSchema() {
return sqlSchema;
}
public void setSql(String sql) {
this.sql = sql;
public void setSqlSchema(String sqlSchema) {
this.sqlSchema = sqlSchema;
}
public String getType() {
return type;
public String getSqlTable() {
return sqlTable;
}
public void setType(String type) {
this.type = type;
public void setSqlTable(String sqlTable) {
this.sqlTable = sqlTable;
}
public String getExtra() {
return extra;
public String getSqlText() {
return sqlText;
}
public void setExtra(String extra) {
this.extra = extra;
public void setSqlText(String sqlText) {
this.sqlText = sqlText;
}
public String getSqlType() {
return sqlType;
}
public void setSqlType(String sqlType) {
this.sqlType = sqlType;
}
@Override
public String toString() {
return "MetaHistoryDO [id=" + id + ", gmtCreate=" + gmtCreate + ", gmtModified=" + gmtModified
+ ", destination=" + destination + ", binlogFile=" + binlogFile + ", binlogOffest=" + binlogOffest
+ ", binlogMasterId=" + binlogMasterId + ", binlogTimestamp=" + binlogTimestamp + ", useSchema="
+ useSchema + ", sqlSchema=" + sqlSchema + ", sqlTable=" + sqlTable + ", sqlText=" + sqlText
+ ", sqlType=" + sqlType + ", extra=" + extra + "]";
}
}
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import javax.sql.DataSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO;
import com.google.common.collect.Maps;
/**
......@@ -13,27 +21,29 @@ import com.google.common.collect.Maps;
* @author wanshao 2017年7月27日 下午10:51:55
* @since 3.2.5
*/
@SuppressWarnings("deprecation")
public class MetaSnapshotDAO extends SqlMapClientDaoSupport {
public Long insert(MetaSnapshotDO snapshotDO) {
return (Long) getSqlMapClientTemplate().insert("table_meta_snapshot.insert", snapshotDO);
return (Long) getSqlMapClientTemplate().insert("meta_snapshot.insert", snapshotDO);
}
public Long update(MetaSnapshotDO snapshotDO) {
return (Long) getSqlMapClientTemplate().insert("table_meta_snapshot.update", snapshotDO);
return (Long) getSqlMapClientTemplate().insert("meta_snapshot.update", snapshotDO);
}
public MetaSnapshotDO findByTimestamp(long timestamp) {
public MetaSnapshotDO findByTimestamp(String destination, Long timestamp) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("timestamp", timestamp);
return (MetaSnapshotDO) getSqlMapClientTemplate().queryForObject("table_meta_snapshot.findByTimestamp", params);
params.put("timestamp", timestamp == null ? 0L : timestamp);
params.put("destination", destination);
return (MetaSnapshotDO) getSqlMapClientTemplate().queryForObject("meta_snapshot.findByTimestamp", params);
}
public Integer deleteByTask(String taskName) {
public Integer deleteByName(String destination) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("taskName", taskName);
return getSqlMapClientTemplate().delete("table_meta_snapshot.deleteByTaskName", params);
params.put("destination", destination);
return getSqlMapClientTemplate().delete("meta_snapshot.deleteByName", params);
}
/**
......@@ -41,8 +51,35 @@ public class MetaSnapshotDAO extends SqlMapClientDaoSupport {
*/
public Integer deleteByGmtModified(int interval) {
HashMap params = Maps.newHashMapWithExpectedSize(2);
params.put("interval", interval);
return getSqlMapClientTemplate().delete("table_meta_snapshot.deleteByGmtModified", params);
long timestamp = System.currentTimeMillis() - interval * 1000;
Date date = new Date(timestamp);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
params.put("timestamp", format.format(date));
return getSqlMapClientTemplate().delete("meta_snapshot.deleteByGmtModified", params);
}
protected void initDao() throws Exception {
Connection conn = null;
InputStream input = null;
try {
DataSource dataSource = getDataSource();
conn = dataSource.getConnection();
input = Thread.currentThread().getContextClassLoader().getResourceAsStream("ddl/mysql/meta_snapshot.sql");
if (input == null) {
return;
}
String sql = StringUtils.join(IOUtils.readLines(input), "\n");
Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
} catch (Throwable e) {
logger.warn("init meta_history failed", e);
} finally {
IOUtils.closeQuietly(input);
if (conn != null) {
conn.close();
}
}
}
}
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model;
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao;
import java.util.Date;
/**
* @author agapple 2017年7月27日 下午11:09:41
* @since 3.2.5
* @since 1.0.25
*/
public class MetaSnapshotDO {
/**
* 主键
*/
private Long id;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
private String destination;
private String binlogFile;
private Long binlogOffest;
private String binlogMasterId;
......@@ -102,4 +91,20 @@ public class MetaSnapshotDO {
this.extra = extra;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
@Override
public String toString() {
return "MetaSnapshotDO [id=" + id + ", gmtCreate=" + gmtCreate + ", gmtModified=" + gmtModified
+ ", destination=" + destination + ", binlogFile=" + binlogFile + ", binlogOffest=" + binlogOffest
+ ", binlogMasterId=" + binlogMasterId + ", binlogTimestamp=" + binlogTimestamp + ", data=" + data
+ ", extra=" + extra + "]";
}
}
CREATE TABLE meta_history (
id bigint GENERATED ALWAYS AS IDENTITY NOT NULL,
gmt_create timestamp NOT NULL,
gmt_modified timestamp NOT NULL,
destination varchar(128) DEFAULT NULL,
binlog_file varchar(64) DEFAULT NULL,
binlog_offest bigint DEFAULT NULL,
binlog_master_id varchar(64) DEFAULT NULL,
binlog_timestamp bigint DEFAULT NULL,
use_schema varchar(1024) DEFAULT NULL,
sql_schema varchar(1024) DEFAULT NULL,
sql_table varchar(1024) DEFAULT NULL,
sql_text clob(16 M) DEFAULT NULL,
sql_type varchar(1024) DEFAULT NULL,
extra varchar(512) DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT meta_history_binlog_file_offest UNIQUE (destination,binlog_master_id,binlog_file,binlog_offest)
);
create index meta_history_destination on meta_history(destination);
create index meta_history_destination_timestamp on meta_history(destination,binlog_timestamp);
create index meta_history_gmt_modified on meta_history(gmt_modified);
\ No newline at end of file
CREATE TABLE meta_snapshot (
id bigint GENERATED ALWAYS AS IDENTITY NOT NULL,
gmt_create timestamp NOT NULL,
gmt_modified timestamp NOT NULL,
destination varchar(128) DEFAULT NULL,
binlog_file varchar(64) DEFAULT NULL,
binlog_offest bigint DEFAULT NULL,
binlog_master_id varchar(64) DEFAULT NULL,
binlog_timestamp bigint DEFAULT NULL,
data clob(16 M) DEFAULT NULL,
extra varchar(512) DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT meta_snapshot_binlog_file_offest UNIQUE (destination,binlog_master_id,binlog_file,binlog_offest)
);
create index meta_snapshot_destination on meta_snapshot(destination);
create index meta_snapshot_destination_timestamp on meta_snapshot(destination,binlog_timestamp);
create index meta_snapshot_gmt_modified on meta_snapshot(gmt_modified);
\ No newline at end of file
CREATE TABLE IF NOT EXISTS `meta_history` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
`binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
`binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
`binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
`binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
`use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
`sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
`sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
`sql_text` longtext DEFAULT NULL COMMENT '执行的sql',
`sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
`extra` text DEFAULT NULL COMMENT '额外的扩展信息',
PRIMARY KEY (`id`),
UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
KEY `destination` (`destination`),
KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构变化明细表';
\ No newline at end of file
CREATE TABLE IF NOT EXISTS `meta_snapshot` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
`binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
`binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
`binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
`binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
`data` longtext DEFAULT NULL COMMENT '表结构数据',
`extra` text DEFAULT NULL COMMENT '额外的扩展信息',
PRIMARY KEY (`id`),
UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
KEY `destination` (`destination`),
KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
KEY `gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表结构记录表快照表';
\ No newline at end of file
......@@ -31,7 +31,7 @@ public class LocalBinlogEventParserTest {
public void setUp() {
URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
File dummyFile = new File(url.getFile());
directory = new File("/Users/wanshao/projects/canal/parse/src/test/resources/binlog/tsdb").getPath();
directory = new File(dummyFile + "/binlog/tsdb").getPath();
}
@Test
......
......@@ -7,6 +7,7 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
......@@ -26,13 +27,18 @@ public class MysqlDumpTest {
@Test
public void testSimple() {
final MysqlEventParser controller = new MysqlEventParser();
final EntryPosition startPosition = new EntryPosition("mysql-bin.000003", 123L);
final EntryPosition startPosition = new EntryPosition("mysql-bin.000001", 104606L);
controller.setConnectionCharset(Charset.forName("UTF-8"));
controller.setSlaveId(3344L);
controller.setDetectingEnable(false);
controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306), "canal", "canal"));
controller.setMasterPosition(startPosition);
controller.setEnableTsdb(true);
controller.setDestination("example");
controller.setTsdbSpringXml("classpath:tsdb/mysql-tsdb.xml");
controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {
public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
......@@ -100,7 +106,7 @@ public class MysqlDumpTest {
controller.start();
try {
Thread.sleep(100 * 1000L);
Thread.sleep(100 * 1000 * 1000L);
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
......
......@@ -4,32 +4,31 @@ import java.io.File;
import java.io.FileInputStream;
import java.net.URL;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.alibaba.otter.canal.parse.inbound.TableMeta;
/**
* @author agapple 2017年8月1日 下午7:15:54
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/dal-dao.xml" })
@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
public class MemoryTableMetaTest {
@Test
public void testSimple() throws Throwable {
MemoryTableMeta memoryTableMeta = new MemoryTableMeta(null);
MemoryTableMeta memoryTableMeta = new MemoryTableMeta();
URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
File dummyFile = new File(url.getFile());
File create = new File(dummyFile.getParent() + "/ddl", "create.sql");
String sql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
memoryTableMeta.apply(null, "test", sql);
memoryTableMeta.apply(null, "test", sql, null);
TableMeta meta = memoryTableMeta.find("test", "test");
System.out.println(meta);
......
......@@ -4,27 +4,27 @@ import java.util.List;
import javax.annotation.Resource;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO;
import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO;
/**
* Created by wanshao Date: 2017/9/20 Time: 下午5:00
**/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/dal-dao.xml" })
@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
public class MetaHistoryDAOTest {
@Resource
MetaHistoryDAO metaHistoryDAO;
@Test
public void testGetAll() {
List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.getAll();
public void testSimple() {
List<MetaHistoryDO> metaHistoryDOList = metaHistoryDAO.findByTimestamp("test", 0L, 0L);
for (MetaHistoryDO metaHistoryDO : metaHistoryDOList) {
System.out.println(metaHistoryDO.getId());
}
......
package com.alibaba.otter.canal.parse.inbound.mysql.tsdb;
import org.junit.Test;
import org.springframework.util.Assert;
/**
* @author agapple 2017年10月12日 上午10:50:00
* @since 1.0.25
*/
public class TableMetaManagerBuilderTest {
@Test
public void testSimple() {
TableMetaTSDB tableMetaTSDB = TableMetaTSDBBuilder.build("test", "classpath:tsdb/mysql-tsdb.xml");
Assert.notNull(tableMetaTSDB);
TableMetaTSDBBuilder.destory("test");
}
}
......@@ -8,7 +8,6 @@ import java.net.URL;
import javax.annotation.Resource;
import com.taobao.tddl.dbsync.binlog.BinlogPosition;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
......@@ -16,12 +15,14 @@ import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
/**
* @author wanshao 2017年8月2日 下午4:11:45
* @since 3.2.5
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/dal-dao.xml" })
@ContextConfiguration(locations = { "/tsdb/mysql-tsdb.xml" })
public class TableMetaManagerTest {
@Resource
......@@ -29,17 +30,18 @@ public class TableMetaManagerTest {
@Test
public void testSimple() throws FileNotFoundException, IOException {
tableMetaManager.init("test");
URL url = Thread.currentThread().getContextClassLoader().getResource("dummy.txt");
File dummyFile = new File(url.getFile());
File create = new File(dummyFile.getParent() + "/ddl", "create.sql");
BinlogPosition position = BinlogPosition.parseFromString("001115:0139177334#3065927853.1501660815000");
EntryPosition position = new EntryPosition("mysql-bin.001115", 139177334L, 3065927853L, 1501660815000L);
String createSql = StringUtils.join(IOUtils.readLines(new FileInputStream(create)), "\n");
tableMetaManager.apply(position, "tddl5_00", createSql);
tableMetaManager.apply(position, "tddl5_00", createSql, null);
String alterSql = "alter table `test` add column name varchar(32) after c_varchar";
position = BinlogPosition.parseFromString("001115:0139177334#3065927853.1501660816000");
tableMetaManager.apply(position, "tddl5_00", alterSql);
position = new EntryPosition("mysql-bin.001115", 139177334L, 3065927853L, 1501660815000L);
tableMetaManager.apply(position, "tddl5_00", alterSql, null);
}
}
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`id` bigint(20) zerofill unsigNed NOT NULL AUTO_INCREMENT COMMENT 'id',
`c_tinyint` tinyint(4) DEFAULT '1' COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT 0 COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
......
......@@ -5,28 +5,33 @@
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"
default-autowire="byName">
<tx:annotation-driven/>
<bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
factory-method="getDataSource">
<constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
<constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
<constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
<constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="txTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="transactionManager"></property>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"></property>
</bean>
<!-- 基于db的实现 -->
<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
</bean>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="org.apache.derby.jdbc.EmbeddedDriver" />
<property name="url" value="jdbc:derby:derbyTest;create=true" />
<property name="username" value="canal" />
<property name="password" value="canal" />
<property name="maxActive" value="30" />
<property name="initialSize" value="0" />
<property name="minIdle" value="1" />
<property name="maxWait" value="10000" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="useUnfairLock" value="true" />
</bean>
<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="configLocation" value="classpath:sqlmap-config.xml"/>
<property name="configLocation" value="classpath:tsdb/sql-map/sqlmap-config.xml"/>
</bean>
<bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
......
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"/>
<dataSource type="POOLED">
<property name="driver" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/tsdb"/>
<property name="username" value="canal"/>
<property name="password" value="canal"/>
</dataSource>
</environment>
</environments>
<mappers>
<package name="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.mapper"/>
</mappers>
</configuration>
\ No newline at end of file
......@@ -5,28 +5,36 @@
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"
default-autowire="byName">
<tx:annotation-driven/>
<bean id="dataSource" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DataSourceFactoryTSDB"
factory-method="getDataSource">
<constructor-arg index="0" value="${canal.instance.tsdb.url}"/>
<constructor-arg index="1" value="${canal.instance.tsdb.dbUsername}"/>
<constructor-arg index="2" value="${canal.instance.tsdb.dbPassword}"/>
<constructor-arg index="3" value="${canal.instance.tsdb.enable:true}"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="txTemplate" class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="transactionManager"></property>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"></property>
</bean>
<!-- 基于db的实现 -->
<bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaManager">
<property name="metaHistoryDAO" ref="metaHistoryDAO"/>
<property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
</bean>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://127.0.0.1:3306/canal_tsdb" />
<property name="username" value="canal" />
<property name="password" value="canal" />
<property name="maxActive" value="30" />
<property name="initialSize" value="0" />
<property name="minIdle" value="1" />
<property name="maxWait" value="10000" />
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 1" />
<property name="exceptionSorterClassName" value="com.alibaba.druid.pool.vendor.MySqlExceptionSorter" />
<property name="validConnectionCheckerClassName" value="com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="useUnfairLock" value="true" />
</bean>
<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="configLocation" value="classpath:sqlmap-config.xml"/>
<property name="configLocation" value="classpath:tsdb/sql-map/sqlmap-config.xml"/>
</bean>
<bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
......
......@@ -3,7 +3,6 @@
"http://www.ibatis.com/dtd/sql-map-config-2.dtd">
<sqlMapConfig>
<settings useStatementNamespaces="true"/>
<sqlMap resource="sql-map/sqlmap_history.xml"/>
<sqlMap resource="sql-map/sqlmap_snapshot.xml"/>
<sqlMap resource="tsdb/sql-map/sqlmap_history.xml"/>
<sqlMap resource="tsdb/sql-map/sqlmap_snapshot.xml"/>
</sqlMapConfig>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd" >
<sqlMap namespace="table_meta_history">
<typeAlias alias="metaHistoryDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaHistoryDO"/>
<sqlMap namespace="meta_history">
<typeAlias alias="metaHistoryDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDO"/>
<sql id="allColumns">
<![CDATA[
gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,`schema`,`table`,`sql`,`type`,`extra`
gmt_create,gmt_modified,destination,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,use_schema,sql_schema,sql_table,sql_text,sql_type,extra
]]>
</sql>
<sql id="allVOColumns">
<![CDATA[
a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
a.use_schema as useSchema,a.`schema` as `schema`,a.`table` as `table`,a.`sql` as `sql`,a.`type` as `type`,a.`extra` as `extra`
a.destination as destination,a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,
a.use_schema as useSchema,a.sql_schema as sqlSchema,a.sql_table as sqlTable,a.sql_text as sqlText,a.sql_type as sqlType,a.extra as extra
]]>
</sql>
<select id="findByTimestamp" parameterClass="java.util.Map" resultClass="metaHistoryDO">
select
<include refid="allVOColumns"/>
from `canal_table_meta_history$env$` a
from meta_history a
<![CDATA[
where binlog_timestamp >= #snapshotTimestamp# and binlog_timestamp <= #timestamp#
where destination = #destination# and binlog_timestamp >= #snapshotTimestamp# and binlog_timestamp <= #timestamp#
order by binlog_timestamp asc,id asc
]]>
</select>
<insert id="insert" parameterClass="metaHistoryDO">
insert into `canal_table_meta_history` (<include refid="allColumns"/>)
values(now(),now(),#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#useSchema#,#schema#,#table#,#sql#,#type#,#extra#);
<selectKey resultClass="java.lang.Long" keyProperty="id">
SELECT last_insert_id()
</selectKey>
insert into meta_history (<include refid="allColumns"/>)
values(CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,#destination#,#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#useSchema#,#sqlSchema#,#sqlTable#,#sqlText#,#sqlType#,#extra#)
</insert>
<delete id="deleteByName" parameterClass="java.util.Map">
delete from meta_history
where destination=#destination#
</delete>
<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<![CDATA[
delete from `canal_table_meta_history`
where gmt_modified < date_sub(now(),interval #interval# second)
delete from meta_history
where gmt_modified < timestamp(#timestamp#)
]]>
</delete>
<select id="getAll" resultClass="metaHistoryDO">
select * from canal_table_meta_history
</select>
</sqlMap>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd" >
<sqlMap namespace="table_meta_snapshot">
<typeAlias alias="metaSnapshotDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.model.MetaSnapshotDO"/>
<sqlMap namespace="meta_snapshot">
<typeAlias alias="metaSnapshotDO" type="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDO"/>
<typeAlias alias="tableMetaSnapshotDO"
type="com.alibaba.middleware.jingwei.biz.dataobject.CanalTableMetaSnapshotDO"/>
<sql id="allColumns">
<![CDATA[
gmt_create,gmt_modified,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,data,extra
gmt_create,gmt_modified,destination,binlog_file,binlog_offest,binlog_master_id,binlog_timestamp,data,extra
]]>
</sql>
<sql id="allVOColumns">
<![CDATA[
a.id as id,a.gmt_create as gmtCreate,a.gmt_modified as gmtModified,
a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMaster_id,a.binlog_timestamp as binlogTimestamp,a.data as data,a.extra as extra
a.destination as destination,a.binlog_file as binlogFile,a.binlog_offest as binlogOffest,a.binlog_master_id as binlogMasterId,a.binlog_timestamp as binlogTimestamp,a.data as data,a.extra as extra
]]>
</sql>
<select id="findByTimestamp" parameterClass="java.util.Map" resultClass="metaSnapshotDO">
select
<include refid="allVOColumns"/>
from `canal_table_meta_snapshot$env$` a
<![CDATA[
where binlog_timestamp < #timestamp#
select <include refid="allVOColumns"/>
<![CDATA[
from meta_snapshot a
where destination = #destination# and binlog_timestamp < #timestamp#
order by binlog_timestamp desc,id desc
limit 1
]]>
</select>
<select id="findByTimestampOnDerby" parameterClass="java.util.Map" resultClass="metaSnapshotDO">
select * FROM (
select ROW_NUMBER() OVER() AS rownum, <include refid="allVOColumns"/>
<![CDATA[
from meta_snapshot a
where destination = #destination# and binlog_timestamp < #timestamp#
order by binlog_timestamp desc,id desc
) AS tmp
WHERE rownum <= 5
]]>
</select>
<insert id="insert" parameterClass="metaSnapshotDO">
insert into `canal_table_meta_snapshot` (<include refid="allColumns"/>)
values(now(),now(),#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#data#,#extra#);
<selectKey resultClass="java.lang.Long" keyProperty="id">
SELECT last_insert_id()
</selectKey>
insert into meta_snapshot (<include refid="allColumns"/>)
values(CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,#destination#,#binlogFile#,#binlogOffest#,#binlogMasterId#,#binlogTimestamp#,#data#,#extra#)
</insert>
<update id="update" parameterClass="metaSnapshotDO">
update `canal_table_meta_snapshot` set gmt_modified=now(),
update meta_snapshot set gmt_modified=now(),
binlog_file=#binlogFile#,binlog_offest=#binlogOffest#,binlog_master_id=#binlogMasterId#,binlog_timestamp=#binlogTimestamp#,data=#data#,extra=#extra#
where binlog_timestamp=0
where destination=#destination# and binlog_timestamp=0
</update>
<delete id="deleteByName" parameterClass="java.util.Map">
delete from meta_snapshot
where destination=#destination#
</delete>
<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<![CDATA[
delete from `canal_table_meta_snapshot`
where gmt_modified < date_sub(now(),interval #interval# second)
delete from meta_snapshot
where gmt_modified < timestamp(#timestamp#)
]]>
</delete>
<select id="getAll" resultClass="metaSnapshotDO">
select * from canal_table_meta_snapshot
</select>
</sqlMap>
\ No newline at end of file
......@@ -109,6 +109,7 @@
<java_source_version>1.6</java_source_version>
<java_target_version>1.6</java_target_version>
<file_encoding>UTF-8</file_encoding>
<spring_version>3.2.9.RELEASE</spring_version>
</properties>
<modules>
......@@ -131,14 +132,34 @@
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<version>2.5.6</version>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>2.5.6</version>
<version>${spring_version}</version>
<scope>test</scope>
</dependency>
<!-- external -->
......@@ -218,7 +239,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.3</version>
<version>1.1.5-preview_14</version>
</dependency>
<!-- log -->
<dependency>
......@@ -251,9 +272,10 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.33</version>
<version>5.1.40</version>
<!--<scope>test</scope>-->
</dependency>
</dependencies>
</dependencyManagement>
......
package com.alibaba.otter.canal.protocol.position;
/**
* 数据库对象的唯一标示
*
......@@ -121,4 +122,18 @@ public class EntryPosition extends TimePosition {
return true;
}
/**
* {@inheritDoc}
*
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(EntryPosition o) {
final int val = journalName.compareTo(o.journalName);
if (val == 0) {
return (int) (position - o.position);
}
return val;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册