Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
4322b0c5
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4322b0c5
编写于
4月 17, 2017
作者:
V
Vitaliy Lyudvichenko
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Implemented global ZK queue for DDLs. [#CLICKHOUSE-5]
上级
20901557
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
169 addition
and
182 deletion
+169
-182
dbms/src/Interpreters/DDLWorker.cpp
dbms/src/Interpreters/DDLWorker.cpp
+156
-173
dbms/src/Interpreters/DDLWorker.h
dbms/src/Interpreters/DDLWorker.h
+13
-9
未找到文件。
dbms/src/Interpreters/DDLWorker.cpp
浏览文件 @
4322b0c5
...
...
@@ -5,6 +5,11 @@
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <DataStreams/OneBlockInputStream.h>
...
...
@@ -27,36 +32,51 @@ namespace ErrorCodes
{
extern
const
int
UNKNOWN_ELEMENT_IN_CONFIG
;
extern
const
int
INVALID_CONFIG_PARAMETER
;
extern
const
int
UNKNOWN_FORMAT_VERSION
;
}
namespace
{
/// Helper class which extracts from the ClickHouse configuration file
/// the parameters we need for operating the resharding thread.
// struct Arguments
// {
// public:
// Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
// {
// Poco::Util::AbstractConfiguration::Keys keys;
// config.keys(config_name, keys);
//
// for (const auto & key : keys)
// {
// if (key == "distributed_ddl_root")
// ddl_queries_root = config.getString(config_name + "." + key);
// else
// throw Exception{"Unknown parameter in distributed DDL configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
// }
//
// if (ddl_queries_root.empty())
// throw Exception{"Distributed DDL: missing parameter distributed_ddl_root", ErrorCodes::INVALID_CONFIG_PARAMETER};
// }
//
// std::string ddl_queries_root;
// };
}
struct
DDLLogEntry
{
String
query
;
Strings
hosts
;
String
initiator
;
static
constexpr
char
CURRENT_VERSION
=
'1'
;
String
toString
()
{
String
res
;
{
WriteBufferFromString
wb
(
res
);
writeChar
(
CURRENT_VERSION
,
wb
);
wb
<<
"
\n
"
;
wb
<<
"query: "
<<
double_quote
<<
query
<<
"
\n
"
;
wb
<<
"hosts: "
<<
double_quote
<<
hosts
<<
"
\n
"
;
wb
<<
"initiator: "
<<
double_quote
<<
initiator
<<
"
\n
"
;
}
return
res
;
}
void
parse
(
const
String
&
data
)
{
ReadBufferFromString
rb
(
data
);
char
version
;
readChar
(
version
,
rb
);
if
(
version
!=
CURRENT_VERSION
)
throw
Exception
(
"Unknown DDLLogEntry format version: "
+
version
,
ErrorCodes
::
UNKNOWN_FORMAT_VERSION
);
rb
>>
"
\n
"
;
rb
>>
"query: "
>>
double_quote
>>
query
>>
"
\n
"
;
rb
>>
"hosts: "
>>
double_quote
>>
hosts
>>
"
\n
"
;
rb
>>
"initiator: "
>>
double_quote
>>
initiator
>>
"
\n
"
;
assertEOF
(
rb
);
}
};
DDLWorker
::
DDLWorker
(
const
std
::
string
&
zk_root_dir
,
Context
&
context_
)
...
...
@@ -67,9 +87,11 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
root_dir
.
resize
(
root_dir
.
size
()
-
1
);
hostname
=
getFQDNOrHostName
()
+
':'
+
DB
::
toString
(
context
.
getTCPPort
());
assign_dir
=
getAssignsDir
()
+
hostname
;
master_dir
=
getMastersDir
()
+
hostname
;
zookeeper
=
context
.
getZooKeeper
();
event_queue_updated
=
std
::
make_shared
<
Poco
::
Event
>
();
thread
=
std
::
thread
(
&
DDLWorker
::
run
,
this
);
}
...
...
@@ -84,135 +106,134 @@ DDLWorker::~DDLWorker()
void
DDLWorker
::
processTasks
()
{
auto
zookeeper
=
context
.
getZooKeeper
();
Strings
queue_nodes
;
int
code
=
zookeeper
->
tryGetChildren
(
root_dir
,
queue_nodes
,
nullptr
,
event_queue_updated
);
if
(
code
!=
ZNONODE
)
throw
zkutil
::
KeeperException
(
code
);
if
(
!
zookeeper
->
exists
(
assign_dir
))
/// Threre are no tasks
if
(
code
==
ZNONODE
||
queue_nodes
.
empty
())
return
;
Strings
tasks
=
zookeeper
->
getChildren
(
assign_dir
);
if
(
tasks
.
empty
())
return
;
bool
server_startup
=
last_processed_node_name
.
empty
();
String
current_task
=
*
std
::
min_element
(
tasks
.
cbegin
(),
tasks
.
cend
());
std
::
sort
(
queue_nodes
.
begin
(),
queue_nodes
.
end
());
auto
begin_node
=
server_startup
?
queue_nodes
.
begin
()
:
std
::
upper_bound
(
queue_nodes
.
begin
(),
queue_nodes
.
end
(),
last_processed_node_name
);
try
{
processTask
(
current_task
);
}
catch
(...)
for
(
auto
it
=
begin_node
;
it
!=
queue_nodes
.
end
();
++
it
)
{
tryLogCurrentException
(
log
,
"An unexpected error occurred during task "
+
current_task
);
throw
;
String
node_data
,
node_name
=
*
it
,
node_path
=
root_dir
+
"/"
+
node_name
;
code
=
zookeeper
->
tryGet
(
node_path
,
node_data
);
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
if
(
code
!=
ZNONODE
)
throw
zkutil
::
KeeperException
(
code
);
DDLLogEntry
node
;
node
.
parse
(
node_data
);
bool
host_in_hostlist
=
std
::
find
(
node
.
hosts
.
cbegin
(),
node
.
hosts
.
cend
(),
hostname
)
!=
node
.
hosts
.
cend
();
bool
already_processed
=
!
zookeeper
->
exists
(
node_path
+
"/failed/"
+
hostname
)
&&
!
zookeeper
->
exists
(
node_path
+
"/sucess/"
+
hostname
);
if
(
!
server_startup
&&
already_processed
)
{
throw
Exception
(
"Server expects that DDL node "
+
node_name
+
" should be processed, but it was already processed according to ZK"
,
ErrorCodes
::
LOGICAL_ERROR
);
}
if
(
host_in_hostlist
&&
!
already_processed
)
{
try
{
processTask
(
node
,
node_name
);
}
catch
(...)
{
/// It could be network error, but we mark node as processed anyway.
last_processed_node_name
=
node_name
;
tryLogCurrentException
(
log
,
"An unexpected error occurred during processing DLL query "
+
node
.
query
+
" ("
+
node_name
+
")"
);
throw
;
}
}
last_processed_node_name
=
node_name
;
}
}
bool
DDLWorker
::
processTask
(
const
std
::
string
&
task
)
/// Try to create unexisting "status" dirs for a node
void
DDLWorker
::
createStatusDirs
(
const
std
::
string
&
node_path
)
{
auto
zookeeper
=
context
.
getZooKeeper
();
auto
acl
=
zookeeper
->
getDefaultACL
();
String
query_dir
=
root_dir
+
"/"
+
task
;
String
assign_node
=
assign_dir
+
"/"
+
task
;
String
active_node
=
query_dir
+
"/active/"
+
hostname
;
String
sucsess_node
=
query_dir
+
"/sucess/"
+
hostname
;
String
fail_node
=
query_dir
+
"/failed/"
+
hostname
;
zkutil
::
Ops
ops
;
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
node_path
+
"/active"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
node_path
+
"/sucess"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
node_path
+
"/failed"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
String
query
=
zookeeper
->
get
(
query_dir
);
int
code
=
zookeeper
->
tryMulti
(
ops
);
if
(
code
!=
ZOK
&&
code
!=
ZNODEEXISTS
)
throw
zkutil
::
KeeperException
(
code
);
}
if
(
zookeeper
->
exists
(
sucsess_node
)
||
zookeeper
->
exists
(
fail_node
))
{
throw
Exception
(
"Task "
+
task
+
" (query "
+
query
+
") was already processed by node "
+
hostname
,
ErrorCodes
::
LOGICAL_ERROR
);
}
/// Create active flag
zookeeper
->
create
(
active_node
,
""
,
zkutil
::
CreateMode
::
Ephemeral
);
bool
DDLWorker
::
processTask
(
const
DDLLogEntry
&
node
,
const
std
::
string
&
node_name
)
{
String
node_path
=
root_dir
+
"/"
+
node_name
;
createStatusDirs
(
node_path
);
String
active_flag_path
=
node_path
+
"/active/"
+
hostname
;
zookeeper
->
create
(
active_flag_path
,
""
,
zkutil
::
CreateMode
::
Ephemeral
);
///
Will delete task from host's tasks list,
delete active flag and ...
///
At the end we will
delete active flag and ...
zkutil
::
Ops
ops
;
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
assign_node
,
-
1
)
);
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
active_
node
,
-
1
));
auto
acl
=
zookeeper
->
getDefaultACL
(
);
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
active_
flag_path
,
-
1
));
try
{
executeQuery
(
query
,
context
);
executeQuery
(
node
.
query
,
context
);
}
catch
(...)
{
/// ... and create fail flag
String
fail_flag_path
=
node_path
+
"/failed/"
+
hostname
;
String
exception_msg
=
getCurrentExceptionMessage
(
false
,
true
);
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
fail_node
,
exception_msg
,
nullptr
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
fail_flag_path
,
exception_msg
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
zookeeper
->
multi
(
ops
);
tryLogCurrentException
(
log
,
"Query "
+
query
+
" wasn't finished successfully"
);
tryLogCurrentException
(
log
,
"Query "
+
node
.
query
+
" wasn't finished successfully"
);
return
false
;
}
/// .. and create sucess flag
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
sucsess_node
,
""
,
nullptr
,
zkutil
::
CreateMode
::
Persistent
));
/// ... and create sucess flag
String
fail_flag_path
=
node_path
+
"/sucess/"
+
hostname
;
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
fail_flag_path
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
zookeeper
->
multi
(
ops
);
return
true
;
}
void
DDLWorker
::
enqueueQuery
(
const
String
&
query
,
const
std
::
vector
<
Cluster
::
Address
>
&
addrs
)
void
DDLWorker
::
enqueueQuery
(
DDLLogEntry
&
entry
)
{
auto
zookeeper
=
context
.
getZooKeeper
();
if
(
entry
.
hosts
.
empty
())
return
;
String
assigns_dir
=
getAssignsDir
();
String
master_dir
=
getCurrentMasterDir
();
String
query_path_prefix
=
getRoot
()
+
"/query-"
;
zookeeper
->
createAncestors
(
query_path_prefix
);
zookeeper
->
createAncestors
(
assigns_dir
+
"/"
);
zookeeper
->
createAncestors
(
master_dir
+
"/"
);
String
query_path
=
zookeeper
->
create
(
query_path_prefix
,
query
,
zkutil
::
CreateMode
::
PersistentSequential
);
String
query_node
=
query_path
.
substr
(
query_path
.
find_last_of
(
'/'
)
+
1
);
zkutil
::
Ops
ops
;
auto
acl
=
zookeeper
->
getDefaultACL
();
constexpr
size_t
max_ops_per_call
=
100
;
/// Create /root/masters/query_node and /root/query-node/* to monitor status of the tasks initiated by us
{
String
num_hosts
=
toString
(
addrs
.
size
());
String
master_query_node
=
master_dir
+
"/"
+
query_node
;
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
master_query_node
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
query_path
+
"/active"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
query_path
+
"/sucess"
,
num_hosts
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
query_path
+
"/failed"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
zookeeper
->
multi
(
ops
);
ops
.
clear
();
}
/// Create hosts's taks dir /root/assigns/host (if not exists)
for
(
auto
it
=
addrs
.
cbegin
();
it
!=
addrs
.
cend
();
++
it
)
{
String
cur_assign_dir
=
assigns_dir
+
"/"
+
it
->
toString
();
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
cur_assign_dir
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
if
(
ops
.
size
()
>
max_ops_per_call
||
std
::
next
(
it
)
==
addrs
.
cend
())
{
int
code
=
zookeeper
->
tryMulti
(
ops
);
ops
.
clear
();
if
(
code
!=
ZOK
&&
code
!=
ZNODEEXISTS
)
throw
zkutil
::
KeeperException
(
code
);
}
}
/// Asssign tasks to hosts /root/assigns/host/query_node
for
(
auto
it
=
addrs
.
cbegin
();
it
!=
addrs
.
cend
();
++
it
)
{
String
cur_task_path
=
assigns_dir
+
"/"
+
it
->
toString
()
+
"/"
+
query_node
;
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
cur_task_path
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
if
(
ops
.
size
()
>
max_ops_per_call
||
std
::
next
(
it
)
==
addrs
.
cend
())
{
zookeeper
->
multi
(
ops
);
ops
.
clear
();
}
}
String
node_path
=
zookeeper
->
create
(
query_path_prefix
,
entry
.
toString
(),
zkutil
::
CreateMode
::
PersistentSequential
);
createStatusDirs
(
node_path
);
}
...
...
@@ -226,83 +247,45 @@ void DDLWorker::run()
{
processTasks
();
}
catch
(
const
std
::
exception
&
ex
)
catch
(
...
)
{
LOG_ERROR
(
log
,
ex
.
what
()
);
tryLogCurrentException
(
log
);
}
std
::
unique_lock
<
std
::
mutex
>
g
(
lock
);
cond_var
.
wait_for
(
g
,
10s
);
//std::unique_lock<std::mutex> g(lock);
//cond_var.wait_for(g, 10s);
event_queue_updated
->
wait
();
}
}
static
bool
getRemoteQueryExecutionStatus
(
const
Cluster
::
Address
&
addr
,
const
std
::
string
&
query
,
Exception
&
out_exception
)
class
DDLQueryStatusInputSream
:
IProfilingBlockInputStream
{
Connection
conn
(
addr
.
host_name
,
addr
.
port
,
""
,
addr
.
user
,
addr
.
password
);
conn
.
sendQuery
(
query
);
while
(
true
)
{
Connection
::
Packet
packet
=
conn
.
receivePacket
();
if
(
packet
.
type
==
Protocol
::
Server
::
Exception
)
{
out_exception
=
*
packet
.
exception
;
return
false
;
}
else
if
(
packet
.
type
==
Protocol
::
Server
::
EndOfStream
)
break
;
}
return
true
;
}
};
BlockIO
executeDDLQueryOnCluster
(
const
String
&
query
,
const
String
&
cluster_name
,
Context
&
context
)
{
ClusterPtr
cluster
=
context
.
getCluster
(
cluster_name
);
Cluster
::
AddressesWithFailover
shards
=
cluster
->
getShardsWithFailoverAddresses
();
std
::
vector
<
Cluster
::
Address
>
pending_hosts
;
Array
hosts_names_failed
,
hosts_errors
,
hosts_names_pending
;
size_t
num_hosts_total
=
0
;
size_t
num_hosts_finished_successfully
=
0
;
DDLWorker
&
ddl_worker
=
context
.
getDDLWorker
();
DDLLogEntry
entry
;
entry
.
query
=
query
;
entry
.
initiator
=
ddl_worker
.
getHostName
();
for
(
const
auto
&
shard
:
shards
)
{
for
(
const
auto
&
addr
:
shard
)
{
try
{
Exception
ex
;
if
(
!
getRemoteQueryExecutionStatus
(
addr
,
query
,
ex
))
{
/// Normal error
String
exception_msg
=
getExceptionMessage
(
ex
,
false
,
true
);
hosts_names_failed
.
emplace_back
(
addr
.
host_name
);
hosts_errors
.
emplace_back
(
exception_msg
);
LOG_INFO
(
&
Logger
::
get
(
"DDLWorker"
),
"Query "
<<
query
<<
" failed on "
<<
addr
.
host_name
<<
": "
<<
exception_msg
);
}
else
{
++
num_hosts_finished_successfully
;
}
}
catch
(...)
{
/// Network error
pending_hosts
.
emplace_back
(
addr
);
hosts_names_pending
.
emplace_back
(
addr
.
host_name
);
}
}
num_hosts_total
+=
shard
.
size
();
}
entry
.
hosts
.
emplace_back
(
addr
.
toString
());
if
(
!
pending_hosts
.
empty
())
context
.
getDDLWorker
().
enqueueQuery
(
query
,
pending_hosts
);
ddl_worker
.
enqueueQuery
(
entry
);
auto
aray_of_strings
=
std
::
make_shared
<
DataTypeArray
>
(
std
::
make_shared
<
DataTypeString
>
());
...
...
@@ -316,12 +299,12 @@ BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_na
{
aray_of_strings
->
clone
(),
"hosts_pending"
}
};
size_t
num_hosts_finished
=
num_hosts_total
-
pending_hosts
.
size
()
;
size_t
num_hosts_finished
=
num_hosts_total
;
size_t
num_hosts_finished_unsuccessfully
=
num_hosts_finished
-
num_hosts_finished_successfully
;
block
.
getByName
(
"num_hosts_total"
).
column
->
insert
(
num_hosts_total
);
block
.
getByName
(
"num_hosts_finished_successfully"
).
column
->
insert
(
num_hosts_finished_successfully
);
block
.
getByName
(
"num_hosts_finished_unsuccessfully"
).
column
->
insert
(
num_hosts_finished_unsuccessfully
);
block
.
getByName
(
"num_hosts_pending"
).
column
->
insert
(
pending_hosts
.
size
()
);
block
.
getByName
(
"num_hosts_pending"
).
column
->
insert
(
0LU
);
block
.
getByName
(
"hosts_finished_unsuccessfully"
).
column
->
insert
(
hosts_names_failed
);
block
.
getByName
(
"hosts_finished_unsuccessfully_errors"
).
column
->
insert
(
hosts_errors
);
block
.
getByName
(
"hosts_pending"
).
column
->
insert
(
hosts_names_pending
);
...
...
dbms/src/Interpreters/DDLWorker.h
浏览文件 @
4322b0c5
...
...
@@ -13,17 +13,19 @@
namespace
DB
{
BlockIO
executeDDLQueryOnCluster
(
const
String
&
query
,
const
String
&
cluster_name
,
Context
&
context
);
struct
DDLLogEntry
;
class
DDLWorker
{
public:
DDLWorker
(
const
std
::
string
&
zk_root_dir
,
Context
&
context_
);
~
DDLWorker
();
void
enqueueQuery
(
const
String
&
query
,
const
std
::
vector
<
Cluster
::
Address
>
&
addrs
);
void
enqueueQuery
(
DDLLogEntry
&
entry
);
/// Returns root/ path in ZooKeeper
std
::
string
getRoot
()
const
...
...
@@ -31,11 +33,6 @@ public:
return
root_dir
;
}
std
::
string
getAssignsDir
()
const
{
return
root_dir
+
"/assigns"
;
}
std
::
string
getMastersDir
()
const
{
return
root_dir
+
"/masters"
;
...
...
@@ -53,7 +50,9 @@ public:
private:
void
processTasks
();
bool
processTask
(
const
std
::
string
&
task
);
bool
processTask
(
const
DDLLogEntry
&
node
,
const
std
::
string
&
node_path
);
void
createStatusDirs
(
const
std
::
string
&
node_name
);
void
processQueries
();
bool
processQuery
(
const
std
::
string
&
task
);
...
...
@@ -67,7 +66,12 @@ private:
std
::
string
hostname
;
std
::
string
root_dir
;
/// common dir with queue of queries
std
::
string
assign_dir
;
/// dir with tasks assigned to the server
std
::
string
master_dir
;
/// dir with queries was initiated by the server
std
::
string
master_dir
;
/// dir with queries was initiated by the server
std
::
string
last_processed_node_name
;
std
::
shared_ptr
<
zkutil
::
ZooKeeper
>
zookeeper
;
std
::
shared_ptr
<
Poco
::
Event
>
queue_updated
;
std
::
atomic
<
bool
>
stop_flag
;
std
::
condition_variable
cond_var
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录