Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
5d91b4f2
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5d91b4f2
编写于
5月 14, 2018
作者:
S
sundy-li
提交者:
alexey-milovidov
5月 15, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix some bugs, fix some code styles
上级
200076b5
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
113 addition
and
87 deletion
+113
-87
dbms/src/Storages/StorageMySQL.cpp
dbms/src/Storages/StorageMySQL.cpp
+103
-82
dbms/src/Storages/StorageMySQL.h
dbms/src/Storages/StorageMySQL.h
+1
-1
dbms/src/TableFunctions/TableFunctionMySQL.cpp
dbms/src/TableFunctions/TableFunctionMySQL.cpp
+9
-4
未找到文件。
dbms/src/Storages/StorageMySQL.cpp
浏览文件 @
5d91b4f2
...
...
@@ -25,6 +25,7 @@ namespace DB
namespace
ErrorCodes
{
extern
const
int
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
;
extern
const
int
BAD_ARGUMENTS
;
}
...
...
@@ -32,7 +33,7 @@ StorageMySQL::StorageMySQL(const std::string & name,
mysqlxx
::
Pool
&&
pool
,
const
std
::
string
&
remote_database_name
,
const
std
::
string
&
remote_table_name
,
const
bool
&
replace_query
,
const
bool
replace_query
,
const
std
::
string
&
on_duplicate_clause
,
const
ColumnsDescription
&
columns_
,
const
Context
&
context
)
...
...
@@ -74,12 +75,11 @@ BlockInputStreams StorageMySQL::read(
class
StorageMySQLBlockOutputStream
:
public
IBlockOutputStream
{
public:
explicit
StorageMySQLBlockOutputStream
(
const
StorageMySQL
&
storage
,
const
std
::
string
&
remote_database_name
,
const
std
::
string
&
remote_table_name
,
const
mysqlxx
::
PoolWithFailover
::
Entry
&
entry
,
const
size_t
&
mysql_max_rows_to_insert
)
explicit
StorageMySQLBlockOutputStream
(
const
StorageMySQL
&
storage
,
const
std
::
string
&
remote_database_name
,
const
std
::
string
&
remote_table_name
,
const
mysqlxx
::
PoolWithFailover
::
Entry
&
entry
,
const
size_t
&
mysql_max_rows_to_insert
)
:
storage
{
storage
}
,
remote_database_name
{
remote_database_name
}
,
remote_table_name
{
remote_table_name
}
...
...
@@ -88,81 +88,95 @@ public:
{
}
Block
getHeader
()
const
override
{
return
storage
.
getSampleBlock
();
}
void
write
(
const
Block
&
block
)
override
{
auto
blocks
=
splitBlocks
(
block
,
max_batch_rows
);
mysqlxx
::
Transaction
trans
(
entry
);
try
{
for
(
const
Block
&
batch_data
:
blocks
)
{
writeBlockData
(
batch_data
);
}
trans
.
commit
();
}
catch
(...)
{
trans
.
rollback
();
throw
;
}
}
void
writeBlockData
(
const
Block
&
block
)
{
WriteBufferFromOwnString
sqlbuf
;
// If both `replace_query` and `on_duplicate_clause` are specified, only use the `on_duplicate_clause`.
sqlbuf
<<
(
(
storage
.
replace_query
&&
storage
.
on_duplicate_clause
.
empty
())
?
"REPLACE"
:
"INSERT"
);
sqlbuf
<<
" INTO `"
<<
remote_database_name
<<
"`.`"
<<
remote_table_name
<<
"`"
<<
" ( "
<<
block
.
dumpNames
()
<<
" ) VALUES "
;
auto
writer
=
FormatFactory
().
getOutput
(
"Values"
,
sqlbuf
,
storage
.
getSampleBlock
(),
storage
.
context
);
writer
->
write
(
block
);
if
(
!
storage
.
on_duplicate_clause
.
empty
())
sqlbuf
<<
" ON DUPLICATE KEY "
<<
storage
.
on_duplicate_clause
;
sqlbuf
<<
";"
;
auto
query
=
this
->
entry
->
query
(
sqlbuf
.
str
());
query
.
execute
();
}
Blocks
splitBlocks
(
const
Block
&
block
,
const
size_t
&
max_rows
)
const
{
// Avoid Excessive copy when block is small enough
if
(
block
.
rows
()
<=
max_rows
)
return
Blocks
{
std
::
move
(
block
)};
const
size_t
splited_block_size
=
ceil
(
block
.
rows
()
*
1.0
/
max_rows
);
Blocks
splitted_blocks
(
splited_block_size
);
for
(
size_t
idx
=
0
;
idx
<
splited_block_size
;
++
idx
)
splitted_blocks
[
idx
]
=
block
.
cloneEmpty
();
const
size_t
columns
=
block
.
columns
();
const
size_t
rows
=
block
.
rows
();
size_t
offsets
=
0
;
size_t
limits
=
max_batch_rows
;
for
(
size_t
idx
=
0
;
idx
<
splited_block_size
;
++
idx
)
{
// For last batch, limits should be the remain size
if
(
idx
==
splited_block_size
-
1
)
limits
=
rows
-
offsets
;
for
(
size_t
col_idx
=
0
;
col_idx
<
columns
;
++
col_idx
)
{
splitted_blocks
[
idx
].
getByPosition
(
col_idx
).
column
=
block
.
getByPosition
(
col_idx
).
column
->
cut
(
offsets
,
limits
);
}
offsets
+=
max_batch_rows
;
}
return
splitted_blocks
;
}
Block
getHeader
()
const
override
{
return
storage
.
getSampleBlock
();
}
void
write
(
const
Block
&
block
)
override
{
auto
blocks
=
splitBlocks
(
block
,
max_batch_rows
);
mysqlxx
::
Transaction
trans
(
entry
);
try
{
for
(
const
Block
&
batch_data
:
blocks
)
{
writeBlockData
(
batch_data
);
}
trans
.
commit
();
}
catch
(...)
{
trans
.
rollback
();
throw
;
}
}
void
writeBlockData
(
const
Block
&
block
)
{
WriteBufferFromOwnString
sqlbuf
;
sqlbuf
<<
(
storage
.
replace_query
?
"REPLACE"
:
"INSERT"
)
<<
" INTO "
;
sqlbuf
<<
backQuoteIfNeed
(
remote_database_name
)
<<
"."
<<
backQuoteIfNeed
(
remote_table_name
);
sqlbuf
<<
" ( "
<<
dumpNamesWithBackQuote
(
block
)
<<
" ) VALUES "
;
auto
writer
=
FormatFactory
().
getOutput
(
"Values"
,
sqlbuf
,
storage
.
getSampleBlock
(),
storage
.
context
);
writer
->
write
(
block
);
if
(
!
storage
.
on_duplicate_clause
.
empty
())
sqlbuf
<<
" ON DUPLICATE KEY "
<<
storage
.
on_duplicate_clause
;
sqlbuf
<<
";"
;
auto
query
=
this
->
entry
->
query
(
sqlbuf
.
str
());
query
.
execute
();
}
Blocks
splitBlocks
(
const
Block
&
block
,
const
size_t
&
max_rows
)
const
{
/// Avoid Excessive copy when block is small enough
if
(
block
.
rows
()
<=
max_rows
)
return
Blocks
{
std
::
move
(
block
)};
const
size_t
splited_block_size
=
ceil
(
block
.
rows
()
*
1.0
/
max_rows
);
Blocks
splitted_blocks
(
splited_block_size
);
for
(
size_t
idx
=
0
;
idx
<
splited_block_size
;
++
idx
)
splitted_blocks
[
idx
]
=
block
.
cloneEmpty
();
const
size_t
columns
=
block
.
columns
();
const
size_t
rows
=
block
.
rows
();
size_t
offsets
=
0
;
size_t
limits
=
max_batch_rows
;
for
(
size_t
idx
=
0
;
idx
<
splited_block_size
;
++
idx
)
{
/// For last batch, limits should be the remain size
if
(
idx
==
splited_block_size
-
1
)
limits
=
rows
-
offsets
;
for
(
size_t
col_idx
=
0
;
col_idx
<
columns
;
++
col_idx
)
{
splitted_blocks
[
idx
].
getByPosition
(
col_idx
).
column
=
block
.
getByPosition
(
col_idx
).
column
->
cut
(
offsets
,
limits
);
}
offsets
+=
max_batch_rows
;
}
return
splitted_blocks
;
}
std
::
string
dumpNamesWithBackQuote
(
const
Block
&
block
)
const
{
WriteBufferFromOwnString
out
;
for
(
auto
it
=
block
.
begin
();
it
!=
block
.
end
();
++
it
)
{
if
(
it
!=
block
.
begin
())
out
<<
", "
;
out
<<
backQuoteIfNeed
(
it
->
name
);
}
return
out
.
str
();
}
private:
const
StorageMySQL
&
storage
;
std
::
string
remote_database_name
;
std
::
string
remote_table_name
;
mysqlxx
::
PoolWithFailover
::
Entry
entry
;
size_t
max_batch_rows
;
const
StorageMySQL
&
storage
;
std
::
string
remote_database_name
;
std
::
string
remote_table_name
;
mysqlxx
::
PoolWithFailover
::
Entry
entry
;
size_t
max_batch_rows
;
};
...
...
@@ -180,7 +194,7 @@ void registerStorageMySQL(StorageFactory & factory)
if
(
engine_args
.
size
()
<
5
||
engine_args
.
size
()
>
7
)
throw
Exception
(
"Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'
])."
,
"Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'])."
,
ErrorCodes
::
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
);
for
(
size_t
i
=
0
;
i
<
engine_args
.
size
();
++
i
)
...
...
@@ -198,8 +212,15 @@ void registerStorageMySQL(StorageFactory & factory)
bool
replace_query
=
false
;
std
::
string
on_duplicate_clause
;
if
(
engine_args
.
size
()
>=
6
)
replace_query
=
static_cast
<
const
ASTLiteral
&>
(
*
engine_args
[
5
]).
value
.
safeGet
<
UInt64
>
()
>
0
;
if
(
engine_args
.
size
()
==
7
)
on_duplicate_clause
=
static_cast
<
const
ASTLiteral
&>
(
*
engine_args
[
6
]).
value
.
safeGet
<
String
>
();
if
(
engine_args
.
size
()
>=
6
)
replace_query
=
static_cast
<
const
ASTLiteral
&>
(
*
engine_args
[
5
]).
value
.
safeGet
<
UInt64
>
()
>
0
;
if
(
engine_args
.
size
()
==
7
)
on_duplicate_clause
=
static_cast
<
const
ASTLiteral
&>
(
*
engine_args
[
6
]).
value
.
safeGet
<
String
>
();
if
(
replace_query
&&
!
on_duplicate_clause
.
empty
())
throw
Exception
(
"Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them"
,
ErrorCodes
::
BAD_ARGUMENTS
);
return
StorageMySQL
::
create
(
args
.
table_name
,
...
...
dbms/src/Storages/StorageMySQL.h
浏览文件 @
5d91b4f2
...
...
@@ -24,7 +24,7 @@ public:
mysqlxx
::
Pool
&&
pool
,
const
std
::
string
&
remote_database_name
,
const
std
::
string
&
remote_table_name
,
const
bool
&
replace_query
,
const
bool
replace_query
,
const
std
::
string
&
on_duplicate_clause
,
const
ColumnsDescription
&
columns
,
const
Context
&
context
);
...
...
dbms/src/TableFunctions/TableFunctionMySQL.cpp
浏览文件 @
5d91b4f2
...
...
@@ -29,8 +29,8 @@ namespace DB
namespace
ErrorCodes
{
extern
const
int
LOGICAL_ERROR
;
extern
const
int
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
;
extern
const
int
BAD_ARGUMENTS
;;
}
...
...
@@ -90,7 +90,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
ASTs
&
args
=
typeid_cast
<
ASTExpressionList
&>
(
*
args_func
.
arguments
).
children
;
if
(
args
.
size
()
<
5
||
args
.
size
()
>
7
)
throw
Exception
(
"
Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'
])."
,
throw
Exception
(
"
Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'
])."
,
ErrorCodes
::
NUMBER_OF_ARGUMENTS_DOESNT_MATCH
);
for
(
size_t
i
=
0
;
i
<
args
.
size
();
++
i
)
...
...
@@ -104,11 +104,16 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
bool
replace_query
=
false
;
std
::
string
on_duplicate_clause
;
if
(
args
.
size
()
>=
6
)
if
(
args
.
size
()
>=
6
)
replace_query
=
static_cast
<
const
ASTLiteral
&>
(
*
args
[
5
]).
value
.
safeGet
<
UInt64
>
()
>
0
;
if
(
args
.
size
()
==
7
)
if
(
args
.
size
()
==
7
)
on_duplicate_clause
=
static_cast
<
const
ASTLiteral
&>
(
*
args
[
6
]).
value
.
safeGet
<
String
>
();
if
(
replace_query
&&
!
on_duplicate_clause
.
empty
())
throw
Exception
(
"Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them"
,
ErrorCodes
::
BAD_ARGUMENTS
);
/// 3306 is the default MySQL port number
auto
parsed_host_port
=
parseAddress
(
host_port
,
3306
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录