Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Shardingsphere
提交
0cce3825
Shardingsphere
项目概览
apache
/
Shardingsphere
通知
56
Star
3
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Shardingsphere
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
0cce3825
编写于
10月 09, 2019
作者:
A
avalon566
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
For checkstyle
上级
b6957099
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
122 addition
and
39 deletion
+122
-39
src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java
...a/info/avalon566/shardingscaling/job/DatabaseSyncJob.java
+5
-1
src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java
...o/avalon566/shardingscaling/sync/core/AbstractRunner.java
+14
-1
src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java
...ava/info/avalon566/shardingscaling/sync/core/Channel.java
+11
-1
src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java
...alon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java
+1
-1
src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java
...alon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java
+1
-1
src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java
...java/info/avalon566/shardingscaling/sync/jdbc/Column.java
+3
-1
src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java
...o/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java
+1
-0
src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java
.../info/avalon566/shardingscaling/sync/jdbc/DataRecord.java
+27
-5
src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java
...o/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java
+49
-27
src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java
...rdingscaling/sync/mysql/binlog/packet/AbstractPacket.java
+8
-0
src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java
...gscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java
+1
-0
src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java
...mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java
+1
-1
未找到文件。
src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java
浏览文件 @
0cce3825
...
...
@@ -21,6 +21,7 @@ import info.avalon566.shardingscaling.job.config.SyncConfiguration;
import
lombok.extern.slf4j.Slf4j
;
/**
* Database sync job.
* @author avalon566
*/
@Slf4j
...
...
@@ -32,12 +33,15 @@ public class DatabaseSyncJob {
private
final
RealtimeDataSyncer
realtimeDataSyncer
;
public
DatabaseSyncJob
(
SyncConfiguration
syncConfiguration
)
{
public
DatabaseSyncJob
(
final
SyncConfiguration
syncConfiguration
)
{
this
.
syncConfiguration
=
syncConfiguration
;
this
.
historyDataSyncer
=
new
HistoryDataSyncer
(
syncConfiguration
);
this
.
realtimeDataSyncer
=
new
RealtimeDataSyncer
(
syncConfiguration
);
}
/**
* Run.
*/
public
void
run
()
{
realtimeDataSyncer
.
preRun
();
historyDataSyncer
.
run
();
...
...
src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java
浏览文件 @
0cce3825
...
...
@@ -17,18 +17,31 @@
package
info.avalon566.shardingscaling.sync.core
;
import
lombok.AccessLevel
;
import
lombok.Getter
;
import
lombok.Setter
;
/**
* Abstract runner.
* @author avalon566
*/
public
abstract
class
AbstractRunner
implements
Runner
{
protected
boolean
running
=
false
;
@Setter
(
AccessLevel
.
PROTECTED
)
@Getter
(
AccessLevel
.
PROTECTED
)
private
boolean
running
;
/**
* generic start implement.
*/
@Override
public
void
start
()
{
running
=
true
;
}
/**
* generic stop implement.
*/
@Override
public
void
stop
()
{
running
=
false
;
...
...
src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java
浏览文件 @
0cce3825
...
...
@@ -18,10 +18,20 @@
package
info.avalon566.shardingscaling.sync.core
;
/**
* Channel.
* @author avalon566
*/
public
interface
Channel
{
/**
* push a {@code DataRecord} to channel.
* @param dataRecord data
*/
void
pushRecord
(
Record
dataRecord
);
/**
* pop a {@code DataRecord} from channel.
* @return dataRecord
*/
Record
popRecord
();
}
\ No newline at end of file
}
src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java
浏览文件 @
0cce3825
...
...
@@ -73,7 +73,7 @@ public abstract class AbstractJdbcReader extends AbstractRunner implements Reade
ps
.
setFetchDirection
(
ResultSet
.
FETCH_REVERSE
);
var
rs
=
ps
.
executeQuery
();
var
metaData
=
rs
.
getMetaData
();
while
(
running
&&
rs
.
next
())
{
while
(
isRunning
()
&&
rs
.
next
())
{
var
record
=
new
DataRecord
(
metaData
.
getColumnCount
());
record
.
setType
(
"bootstrap-insert"
);
record
.
setFullTableName
(
String
.
format
(
"%s.%s"
,
conn
.
getCatalog
(),
rdbmsConfiguration
.
getTableName
()));
...
...
src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java
浏览文件 @
0cce3825
...
...
@@ -66,7 +66,7 @@ public abstract class AbstractJdbcWriter extends AbstractRunner implements Write
var
buffer
=
new
ArrayList
<
DataRecord
>(
2000
);
var
lastFlushTime
=
System
.
currentTimeMillis
();
try
{
while
(
running
)
{
while
(
isRunning
()
)
{
var
record
=
channel
.
popRecord
();
if
(
null
==
record
)
{
try
{
...
...
src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java
浏览文件 @
0cce3825
...
...
@@ -21,11 +21,13 @@ import lombok.AllArgsConstructor;
import
lombok.Data
;
/**
* Column.
* @author avalon566
*/
@Data
@AllArgsConstructor
public
class
Column
{
private
Object
value
;
private
boolean
updated
;
}
\ No newline at end of file
}
src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java
浏览文件 @
0cce3825
...
...
@@ -20,6 +20,7 @@ package info.avalon566.shardingscaling.sync.jdbc;
import
lombok.Data
;
/**
* Column meta data.
* @author avalon566
*/
@Data
...
...
src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java
浏览文件 @
0cce3825
...
...
@@ -24,32 +24,54 @@ import java.util.ArrayList;
import
java.util.List
;
/**
* Data record.
* @author avalon566
*/
@Data
public
class
DataRecord
implements
Record
{
private
String
type
;
private
String
tableName
;
private
String
fullTableName
;
private
final
List
<
Column
>
columns
;
public
DataRecord
(
int
columnCount
)
{
columns
=
new
ArrayList
<
Column
>(
columnCount
);
public
DataRecord
(
final
int
columnCount
)
{
columns
=
new
ArrayList
<>(
columnCount
);
}
public
void
addColumn
(
Column
data
)
{
/**
* Add a column to record.
* @param data column
*/
public
void
addColumn
(
final
Column
data
)
{
columns
.
add
(
data
);
}
/**
* Return column count.
* @return count
*/
public
int
getColumnCount
()
{
return
columns
.
size
();
}
public
Column
getColumn
(
int
index
)
{
/**
* Get column by index.
* @param index of column
* @return column
*/
public
Column
getColumn
(
final
int
index
)
{
return
columns
.
get
(
index
);
}
/**
* Get table name.
* @return tableName
*/
public
String
getTableName
()
{
return
fullTableName
.
split
(
"\\."
)[
1
];
}
}
\ No newline at end of file
}
src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java
浏览文件 @
0cce3825
...
...
@@ -26,12 +26,15 @@ import lombok.var;
import
java.sql.DriverManager
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.util.ArrayList
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.concurrent.ExecutionException
;
/**
* Database meta util.
*
* @author avalon566
*/
@Slf4j
...
...
@@ -51,14 +54,14 @@ public final class DbMetaDataUtil {
private
LoadingCache
<
String
,
List
<
ColumnMetaData
>>
cmdCache
;
public
DbMetaDataUtil
(
RdbmsConfiguration
rdbmsConfiguration
)
{
public
DbMetaDataUtil
(
final
RdbmsConfiguration
rdbmsConfiguration
)
{
this
.
rdbmsConfiguration
=
rdbmsConfiguration
;
pksCache
=
CacheBuilder
.
newBuilder
()
.
maximumSize
(
64
)
.
build
(
new
CacheLoader
<
String
,
List
<
String
>>()
{
@Override
public
List
<
String
>
load
(
String
key
)
{
public
List
<
String
>
load
(
final
String
key
)
{
return
getPrimaryKeysInternal
(
key
);
}
});
...
...
@@ -67,13 +70,19 @@ public final class DbMetaDataUtil {
.
maximumSize
(
64
)
.
build
(
new
CacheLoader
<
String
,
List
<
ColumnMetaData
>>()
{
@Override
public
List
<
ColumnMetaData
>
load
(
String
key
)
{
public
List
<
ColumnMetaData
>
load
(
final
String
key
)
{
return
getColumNamesInternal
(
key
);
}
});
}
public
List
<
String
>
getPrimaryKeys
(
String
tableName
)
{
/**
* Get primary key column name by table name.
*
* @param tableName table name
* @return list of table name
*/
public
List
<
String
>
getPrimaryKeys
(
final
String
tableName
)
{
try
{
return
pksCache
.
get
(
tableName
);
}
catch
(
ExecutionException
e
)
{
...
...
@@ -81,7 +90,7 @@ public final class DbMetaDataUtil {
}
}
private
List
<
String
>
getPrimaryKeysInternal
(
String
tableName
)
{
private
List
<
String
>
getPrimaryKeysInternal
(
final
String
tableName
)
{
try
{
try
(
var
connection
=
DriverManager
.
getConnection
(
rdbmsConfiguration
.
getJdbcUrl
(),
rdbmsConfiguration
.
getUsername
(),
rdbmsConfiguration
.
getPassword
()))
{
var
rs
=
connection
.
getMetaData
().
getPrimaryKeys
(
connection
.
getCatalog
(),
null
,
tableName
);
...
...
@@ -91,11 +100,16 @@ public final class DbMetaDataUtil {
}
return
primaryKeys
;
}
}
catch
(
Exception
e
)
{
}
catch
(
SQL
Exception
e
)
{
throw
new
RuntimeException
(
"getTableNames error"
,
e
);
}
}
/**
* Get all table names in current database.
*
* @return list of table name
*/
public
List
<
String
>
getTableNames
()
{
try
{
try
(
var
connection
=
DriverManager
.
getConnection
(
rdbmsConfiguration
.
getJdbcUrl
(),
rdbmsConfiguration
.
getUsername
(),
rdbmsConfiguration
.
getPassword
()))
{
...
...
@@ -106,12 +120,18 @@ public final class DbMetaDataUtil {
}
return
tableNames
;
}
}
catch
(
Exception
e
)
{
}
catch
(
SQL
Exception
e
)
{
throw
new
RuntimeException
(
"getTableNames error"
,
e
);
}
}
public
List
<
ColumnMetaData
>
getColumNames
(
String
tableName
)
{
/**
* Get all column meta data by table name.
*
* @param tableName table name
* @return list of column meta data
*/
public
List
<
ColumnMetaData
>
getColumNames
(
final
String
tableName
)
{
try
{
return
cmdCache
.
get
(
tableName
);
}
catch
(
ExecutionException
e
)
{
...
...
@@ -119,36 +139,38 @@ public final class DbMetaDataUtil {
}
}
p
ublic
List
<
ColumnMetaData
>
getColumNamesInternal
(
String
tableName
)
{
p
rivate
List
<
ColumnMetaData
>
getColumNamesInternal
(
final
String
tableName
)
{
try
{
try
(
var
connection
=
DriverManager
.
getConnection
(
rdbmsConfiguration
.
getJdbcUrl
(),
rdbmsConfiguration
.
getUsername
(),
rdbmsConfiguration
.
getPassword
()))
{
var
result
=
new
ArrayList
<
ColumnMetaData
>();
try
(
ResultSet
resultSet
=
connection
.
getMetaData
().
getColumns
(
connection
.
getCatalog
(),
connection
.
getSchema
(),
tableName
,
"%"
))
{
while
(
resultSet
.
next
())
{
var
columnMetaData
=
new
ColumnMetaData
();
columnMetaData
.
setColumnName
(
resultSet
.
getString
(
COLUMN_NAME
));
columnMetaData
.
setColumnType
(
resultSet
.
getInt
(
DATA_TYPE
));
columnMetaData
.
setColumnTypeName
(
resultSet
.
getString
(
TYPE_NAME
));
result
.
add
(
columnMetaData
);
}
ResultSet
resultSet
=
connection
.
getMetaData
().
getColumns
(
connection
.
getCatalog
(),
connection
.
getSchema
(),
tableName
,
"%"
);
while
(
resultSet
.
next
())
{
var
columnMetaData
=
new
ColumnMetaData
();
columnMetaData
.
setColumnName
(
resultSet
.
getString
(
COLUMN_NAME
));
columnMetaData
.
setColumnType
(
resultSet
.
getInt
(
DATA_TYPE
));
columnMetaData
.
setColumnTypeName
(
resultSet
.
getString
(
TYPE_NAME
));
result
.
add
(
columnMetaData
);
}
return
result
;
}
}
catch
(
Exception
e
)
{
}
catch
(
SQL
Exception
e
)
{
throw
new
RuntimeException
(
"getTableNames error"
,
e
);
}
}
public
static
int
findColumnIndex
(
List
<
ColumnMetaData
>
metaData
,
String
columnName
)
{
try
{
for
(
int
i
=
0
;
i
<
metaData
.
size
();
i
++)
{
if
(
metaData
.
get
(
i
).
getColumnName
().
equals
(
columnName
))
{
return
i
;
}
/**
* Find column index by column name.
*
* @param metaData meta data list
* @param columnName table name
* @return index
*/
public
static
int
findColumnIndex
(
final
List
<
ColumnMetaData
>
metaData
,
final
String
columnName
)
{
for
(
int
i
=
0
;
i
<
metaData
.
size
();
i
++)
{
if
(
metaData
.
get
(
i
).
getColumnName
().
equals
(
columnName
))
{
return
i
;
}
return
-
1
;
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"findColumnIndex error"
,
e
);
}
return
-
1
;
}
}
src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java
浏览文件 @
0cce3825
...
...
@@ -35,11 +35,19 @@ import lombok.Data;
public
abstract
class
AbstractPacket
implements
Packet
{
private
byte
sequenceNumber
;
/**
* empty implement method,throw {@code UnsupportedOperationException}.
* @param data buffer
*/
@Override
public
void
fromByteBuf
(
final
ByteBuf
data
)
{
throw
new
UnsupportedOperationException
();
}
/**
* empty implement method,throw {@code UnsupportedOperationException}.
* @return data buffer
*/
@Override
public
ByteBuf
toByteBuf
()
{
throw
new
UnsupportedOperationException
();
...
...
src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java
浏览文件 @
0cce3825
...
...
@@ -18,6 +18,7 @@
package
info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog
;
/**
* Mysql column types.
* @author avalon566
*/
public
final
class
ColumnTypes
{
...
...
src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java
浏览文件 @
0cce3825
...
...
@@ -50,7 +50,7 @@ public class MySQLBinlogEventPacketDecoderTest {
@Before
public
void
setUp
()
throws
Exception
{
binlogEventPacketDecoder
=
new
MySQLBinlogEventPacketDecoder
();
binlogEventPacketDecoder
=
new
MySQLBinlogEventPacketDecoder
(
0
);
binlogContext
=
ReflectionUtil
.
getFieldValueFromClass
(
binlogEventPacketDecoder
,
"binlogContext"
,
BinlogContext
.
class
);
binlogContext
.
setChecksumLength
(
4
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录