Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
lijianghuflute
canal
提交
cfa90af3
canal
项目概览
lijianghuflute
/
canal
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
canal
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
cfa90af3
编写于
2月 12, 2018
作者:
A
agapple
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fixed unique and upgrade druid
上级
b1431f88
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
44 addition
and
9 deletion
+44
-9
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
.../alibaba/otter/canal/example/AbstractCanalClientTest.java
+10
-5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
...java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
+16
-3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
...tter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
+1
-0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
...ter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
+4
-0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
...otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
+12
-0
pom.xml
pom.xml
+1
-1
未找到文件。
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java
浏览文件 @
cfa90af3
...
...
@@ -56,10 +56,10 @@ public class AbstractCanalClientTest {
context_format
+=
"****************************************************"
+
SEP
;
row_format
=
SEP
+
"----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
+
"----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}
({})
, delay : {}ms"
+
SEP
;
transaction_format
=
SEP
+
"================> binlog[{}:{}] , executeTime : {} , delay : {}ms"
+
SEP
;
transaction_format
=
SEP
+
"================> binlog[{}:{}] , executeTime : {}
({})
, delay : {}ms"
+
SEP
;
}
...
...
@@ -165,6 +165,8 @@ public class AbstractCanalClientTest {
for
(
Entry
entry
:
entrys
)
{
long
executeTime
=
entry
.
getHeader
().
getExecuteTime
();
long
delayTime
=
new
Date
().
getTime
()
-
executeTime
;
Date
date
=
new
Date
(
entry
.
getHeader
().
getExecuteTime
());
SimpleDateFormat
simpleDateFormat
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
if
(
entry
.
getEntryType
()
==
EntryType
.
TRANSACTIONBEGIN
||
entry
.
getEntryType
()
==
EntryType
.
TRANSACTIONEND
)
{
if
(
entry
.
getEntryType
()
==
EntryType
.
TRANSACTIONBEGIN
)
{
...
...
@@ -178,7 +180,8 @@ public class AbstractCanalClientTest {
logger
.
info
(
transaction_format
,
new
Object
[]
{
entry
.
getHeader
().
getLogfileName
(),
String
.
valueOf
(
entry
.
getHeader
().
getLogfileOffset
()),
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
String
.
valueOf
(
delayTime
)
});
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
simpleDateFormat
.
format
(
date
),
String
.
valueOf
(
delayTime
)
});
logger
.
info
(
" BEGIN ----> Thread id: {}"
,
begin
.
getThreadId
());
}
else
if
(
entry
.
getEntryType
()
==
EntryType
.
TRANSACTIONEND
)
{
TransactionEnd
end
=
null
;
...
...
@@ -193,7 +196,8 @@ public class AbstractCanalClientTest {
logger
.
info
(
transaction_format
,
new
Object
[]
{
entry
.
getHeader
().
getLogfileName
(),
String
.
valueOf
(
entry
.
getHeader
().
getLogfileOffset
()),
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
String
.
valueOf
(
delayTime
)
});
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
simpleDateFormat
.
format
(
date
),
String
.
valueOf
(
delayTime
)
});
}
continue
;
...
...
@@ -213,7 +217,8 @@ public class AbstractCanalClientTest {
new
Object
[]
{
entry
.
getHeader
().
getLogfileName
(),
String
.
valueOf
(
entry
.
getHeader
().
getLogfileOffset
()),
entry
.
getHeader
().
getSchemaName
(),
entry
.
getHeader
().
getTableName
(),
eventType
,
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
String
.
valueOf
(
delayTime
)
});
String
.
valueOf
(
entry
.
getHeader
().
getExecuteTime
()),
simpleDateFormat
.
format
(
date
),
String
.
valueOf
(
delayTime
)
});
if
(
eventType
==
EventType
.
QUERY
||
rowChage
.
getIsDdl
())
{
logger
.
info
(
" sql ----> "
+
rowChage
.
getSql
()
+
SEP
);
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java
浏览文件 @
cfa90af3
...
...
@@ -3,9 +3,10 @@ package com.alibaba.otter.canal.parse.inbound;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent
;
import
org.apache.commons.lang.StringUtils
;
import
com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent
;
/**
* 描述数据meta对象,mysql binlog中对应的{@linkplain TableMapLogEvent}包含的信息不全
*
...
...
@@ -127,6 +128,7 @@ public class TableMeta {
private
boolean
key
;
private
String
defaultValue
;
private
String
extra
;
private
boolean
unique
;
public
String
getColumnName
()
{
return
columnName
;
...
...
@@ -180,10 +182,21 @@ public class TableMeta {
this
.
extra
=
extra
;
}
public
boolean
isUnique
()
{
return
unique
;
}
public
void
setUnique
(
boolean
unique
)
{
this
.
unique
=
unique
;
}
@Override
public
String
toString
()
{
return
"FieldMeta [columnName="
+
columnName
+
", columnType="
+
columnType
+
", defaultValue="
+
defaultValue
+
", nullable="
+
nullable
+
", key="
+
key
+
"]"
;
return
"FieldMeta [columnName="
+
columnName
+
", columnType="
+
columnType
+
", nullable="
+
nullable
+
", key="
+
key
+
", defaultValue="
+
defaultValue
+
", extra="
+
extra
+
", unique="
+
unique
+
"]"
;
}
}
}
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java
浏览文件 @
cfa90af3
...
...
@@ -104,6 +104,7 @@ public class TableMetaCache {
*
size
),
"YES"
));
meta
.
setKey
(
"PRI"
.
equalsIgnoreCase
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_KEY
)
+
i
*
size
)));
meta
.
setUnique
(
"UNI"
.
equalsIgnoreCase
(
packet
.
getFieldValues
().
get
(
nameMaps
.
get
(
COLUMN_KEY
)
+
i
*
size
)));
// 特殊处理引号
meta
.
setDefaultValue
(
DruidDdlParser
.
unescapeQuotaName
(
packet
.
getFieldValues
()
.
get
(
nameMaps
.
get
(
COLUMN_DEFAULT
)
+
i
*
size
)));
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java
浏览文件 @
cfa90af3
...
...
@@ -446,6 +446,10 @@ public class DatabaseTableMeta implements TableMetaTSDB {
if
(
sourceField
.
isKey
()
!=
targetField
.
isKey
())
{
return
false
;
}
if
(
sourceField
.
isUnique
()
!=
targetField
.
isUnique
())
{
return
false
;
}
}
return
true
;
...
...
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java
浏览文件 @
cfa90af3
...
...
@@ -21,12 +21,14 @@ import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import
com.alibaba.druid.sql.ast.statement.SQLColumnConstraint
;
import
com.alibaba.druid.sql.ast.statement.SQLColumnDefinition
;
import
com.alibaba.druid.sql.ast.statement.SQLColumnPrimaryKey
;
import
com.alibaba.druid.sql.ast.statement.SQLColumnUniqueKey
;
import
com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement
;
import
com.alibaba.druid.sql.ast.statement.SQLNotNullConstraint
;
import
com.alibaba.druid.sql.ast.statement.SQLNullConstraint
;
import
com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem
;
import
com.alibaba.druid.sql.ast.statement.SQLTableElement
;
import
com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey
;
import
com.alibaba.druid.sql.dialect.mysql.ast.MySqlUnique
;
import
com.alibaba.druid.sql.repository.Schema
;
import
com.alibaba.druid.sql.repository.SchemaObject
;
import
com.alibaba.druid.sql.repository.SchemaRepository
;
...
...
@@ -204,6 +206,8 @@ public class MemoryTableMeta implements TableMetaTSDB {
fieldMeta
.
setNullable
(
true
);
}
else
if
(
constraint
instanceof
SQLColumnPrimaryKey
)
{
fieldMeta
.
setKey
(
true
);
}
else
if
(
constraint
instanceof
SQLColumnUniqueKey
)
{
fieldMeta
.
setUnique
(
true
);
}
}
tableMeta
.
addFieldMeta
(
fieldMeta
);
...
...
@@ -215,6 +219,14 @@ public class MemoryTableMeta implements TableMetaTSDB {
FieldMeta
field
=
tableMeta
.
getFieldMetaByName
(
name
);
field
.
setKey
(
true
);
}
}
else
if
(
element
instanceof
MySqlUnique
)
{
MySqlUnique
column
=
(
MySqlUnique
)
element
;
List
<
SQLSelectOrderByItem
>
uks
=
column
.
getColumns
();
for
(
SQLSelectOrderByItem
uk
:
uks
)
{
String
name
=
getSqlName
(
uk
.
getExpr
());
FieldMeta
field
=
tableMeta
.
getFieldMetaByName
(
name
);
field
.
setUnique
(
true
);
}
}
}
...
...
pom.xml
浏览文件 @
cfa90af3
...
...
@@ -254,7 +254,7 @@
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
druid
</artifactId>
<version>
1.1.
7-preview_0
</version>
<version>
1.1.
8
</version>
</dependency>
<!-- log -->
<dependency>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录