Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
351a0905
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
351a0905
编写于
8月 10, 2017
作者:
V
Vitaliy Lyudvichenko
提交者:
alexey-milovidov
8月 10, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Implemented fast block and parts cleaning. [#CLICKHOUSE-3207]
上级
e6739cc3
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
322 addition
and
109 deletion
+322
-109
dbms/src/Common/ZooKeeper/Types.h
dbms/src/Common/ZooKeeper/Types.h
+1
-1
dbms/src/Common/ZooKeeper/ZooKeeper.cpp
dbms/src/Common/ZooKeeper/ZooKeeper.cpp
+60
-5
dbms/src/Common/ZooKeeper/ZooKeeper.h
dbms/src/Common/ZooKeeper/ZooKeeper.h
+20
-2
dbms/src/Common/ZooKeeper/tests/CMakeLists.txt
dbms/src/Common/ZooKeeper/tests/CMakeLists.txt
+1
-1
dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp
...rc/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp
+50
-15
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
...c/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
+132
-37
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h
...src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h
+8
-6
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+48
-39
dbms/src/Storages/StorageReplicatedMergeTree.h
dbms/src/Storages/StorageReplicatedMergeTree.h
+2
-3
未找到文件。
dbms/src/Common/ZooKeeper/Types.h
浏览文件 @
351a0905
...
@@ -11,7 +11,7 @@ namespace zkutil
...
@@ -11,7 +11,7 @@ namespace zkutil
{
{
using
ACLPtr
=
const
ACL_vector
*
;
using
ACLPtr
=
const
ACL_vector
*
;
using
Stat
=
Stat
;
using
Stat
=
::
Stat
;
struct
Op
struct
Op
{
{
...
...
dbms/src/Common/ZooKeeper/ZooKeeper.cpp
浏览文件 @
351a0905
...
@@ -555,7 +555,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
...
@@ -555,7 +555,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
for
(
const
auto
&
op
:
ops_
)
for
(
const
auto
&
op
:
ops_
)
ops
.
push_back
(
*
(
op
->
data
));
ops
.
push_back
(
*
(
op
->
data
));
int32_t
code
=
zoo_multi
(
impl
,
ops
.
size
(
),
ops
.
data
(),
out_results
->
data
());
int32_t
code
=
zoo_multi
(
impl
,
static_cast
<
int
>
(
ops
.
size
()
),
ops
.
data
(),
out_results
->
data
());
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperMulti
);
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperMulti
);
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperTransactions
);
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperTransactions
);
...
@@ -612,15 +612,13 @@ int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_resul
...
@@ -612,15 +612,13 @@ int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_resul
return
code
;
return
code
;
}
}
static
const
int
BATCH_SIZE
=
100
;
void
ZooKeeper
::
removeChildrenRecursive
(
const
std
::
string
&
path
)
void
ZooKeeper
::
removeChildrenRecursive
(
const
std
::
string
&
path
)
{
{
Strings
children
=
getChildren
(
path
);
Strings
children
=
getChildren
(
path
);
while
(
!
children
.
empty
())
while
(
!
children
.
empty
())
{
{
zkutil
::
Ops
ops
;
zkutil
::
Ops
ops
;
for
(
size_t
i
=
0
;
i
<
BATCH_SIZE
&&
!
children
.
empty
();
++
i
)
for
(
size_t
i
=
0
;
i
<
MULTI_
BATCH_SIZE
&&
!
children
.
empty
();
++
i
)
{
{
removeChildrenRecursive
(
path
+
"/"
+
children
.
back
());
removeChildrenRecursive
(
path
+
"/"
+
children
.
back
());
ops
.
emplace_back
(
std
::
make_unique
<
Op
::
Remove
>
(
path
+
"/"
+
children
.
back
(),
-
1
));
ops
.
emplace_back
(
std
::
make_unique
<
Op
::
Remove
>
(
path
+
"/"
+
children
.
back
(),
-
1
));
...
@@ -639,7 +637,7 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
...
@@ -639,7 +637,7 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
{
{
zkutil
::
Ops
ops
;
zkutil
::
Ops
ops
;
Strings
batch
;
Strings
batch
;
for
(
size_t
i
=
0
;
i
<
BATCH_SIZE
&&
!
children
.
empty
();
++
i
)
for
(
size_t
i
=
0
;
i
<
MULTI_
BATCH_SIZE
&&
!
children
.
empty
();
++
i
)
{
{
batch
.
push_back
(
path
+
"/"
+
children
.
back
());
batch
.
push_back
(
path
+
"/"
+
children
.
back
());
children
.
pop_back
();
children
.
pop_back
();
...
@@ -904,4 +902,61 @@ ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path)
...
@@ -904,4 +902,61 @@ ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path)
return
future
;
return
future
;
}
}
ZooKeeper
::
MultiFuture
ZooKeeper
::
asyncMultiImpl
(
const
zkutil
::
Ops
&
ops_
,
bool
throw_exception
)
{
size_t
count
=
ops_
.
size
();
OpResultsPtr
results
(
new
OpResults
(
count
));
MultiFuture
future
{
[
throw_exception
,
results
]
(
int
rc
)
{
OpResultsAndCode
res
;
res
.
code
=
rc
;
res
.
results
=
results
;
if
(
throw_exception
&&
rc
!=
ZOK
)
throw
zkutil
::
KeeperException
(
rc
);
return
res
;
}};
if
(
ops_
.
empty
())
{
(
**
future
.
task
)(
ZOK
);
return
future
;
}
/// Workaround of the libzookeeper bug.
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
if
(
expired
())
throw
KeeperException
(
ZINVALIDSTATE
);
/// There is no need to hold these ops until the end of the passed callback
std
::
vector
<
zoo_op_t
>
ops
;
for
(
const
auto
&
op
:
ops_
)
ops
.
push_back
(
*
(
op
->
data
));
int32_t
code
=
zoo_amulti
(
impl
,
static_cast
<
int
>
(
ops
.
size
()),
ops
.
data
(),
results
->
data
(),
[]
(
int
rc
,
const
void
*
data
)
{
MultiFuture
::
TaskPtr
owned_task
=
std
::
move
(
const_cast
<
MultiFuture
::
TaskPtr
&>
(
*
static_cast
<
const
MultiFuture
::
TaskPtr
*>
(
data
)));
(
*
owned_task
)(
rc
);
},
future
.
task
.
get
());
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperMulti
);
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperTransactions
);
if
(
code
!=
ZOK
)
throw
KeeperException
(
code
);
return
future
;
}
ZooKeeper
::
MultiFuture
ZooKeeper
::
tryAsyncMulti
(
const
zkutil
::
Ops
&
ops
)
{
return
asyncMultiImpl
(
ops
,
false
);
}
ZooKeeper
::
MultiFuture
ZooKeeper
::
asyncMulti
(
const
zkutil
::
Ops
&
ops
)
{
return
asyncMultiImpl
(
ops
,
true
);
}
}
}
dbms/src/Common/ZooKeeper/ZooKeeper.h
浏览文件 @
351a0905
...
@@ -31,6 +31,9 @@ const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
...
@@ -31,6 +31,9 @@ const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const
UInt32
MEDIUM_SESSION_TIMEOUT
=
120000
;
const
UInt32
MEDIUM_SESSION_TIMEOUT
=
120000
;
const
UInt32
BIG_SESSION_TIMEOUT
=
600000
;
const
UInt32
BIG_SESSION_TIMEOUT
=
600000
;
/// Preferred size of multi() command (in number of ops)
constexpr
size_t
MULTI_BATCH_SIZE
=
100
;
struct
WatchContext
;
struct
WatchContext
;
...
@@ -46,7 +49,7 @@ struct WatchContext;
...
@@ -46,7 +49,7 @@ struct WatchContext;
/// Modifying methods do not retry, because it leads to problems of the double-delete type.
/// Modifying methods do not retry, because it leads to problems of the double-delete type.
///
///
/// Methods with names not starting at try- raise KeeperException on any error.
/// Methods with names not starting at try- raise KeeperException on any error.
class
ZooKeeper
class
ZooKeeper
{
{
public:
public:
using
Ptr
=
std
::
shared_ptr
<
ZooKeeper
>
;
using
Ptr
=
std
::
shared_ptr
<
ZooKeeper
>
;
...
@@ -241,7 +244,7 @@ public:
...
@@ -241,7 +244,7 @@ public:
/// The caller is responsible for ensuring that the context lives until the callback
/// The caller is responsible for ensuring that the context lives until the callback
/// is finished and we can't simply pass ownership of the context into function object.
/// is finished and we can't simply pass ownership of the context into function object.
/// Instead, we save the context in a Future object and return it to the caller.
/// Instead, we save the context in a Future object and return it to the caller.
/// The c
a
ntext will live until the Future lives.
/// The c
o
ntext will live until the Future lives.
/// Context data is wrapped in an unique_ptr so that its address (which is passed to
/// Context data is wrapped in an unique_ptr so that its address (which is passed to
/// libzookeeper) remains unchanged after the Future is returned from the function.
/// libzookeeper) remains unchanged after the Future is returned from the function.
///
///
...
@@ -320,6 +323,19 @@ public:
...
@@ -320,6 +323,19 @@ public:
RemoveFuture
asyncRemove
(
const
std
::
string
&
path
);
RemoveFuture
asyncRemove
(
const
std
::
string
&
path
);
struct
OpResultsAndCode
{
OpResultsPtr
results
;
Ops
ops
;
int
code
;
};
using
MultiFuture
=
Future
<
OpResultsAndCode
,
int
>
;
MultiFuture
asyncMulti
(
const
zkutil
::
Ops
&
ops
);
/// Like the previous one but don't throw any exceptions on future.get()
MultiFuture
tryAsyncMulti
(
const
zkutil
::
Ops
&
ops
);
static
std
::
string
error2string
(
int32_t
code
);
static
std
::
string
error2string
(
int32_t
code
);
/// Max size of node contents in bytes.
/// Max size of node contents in bytes.
...
@@ -378,6 +394,8 @@ private:
...
@@ -378,6 +394,8 @@ private:
int32_t
multiImpl
(
const
Ops
&
ops
,
OpResultsPtr
*
out_results
=
nullptr
);
int32_t
multiImpl
(
const
Ops
&
ops
,
OpResultsPtr
*
out_results
=
nullptr
);
int32_t
existsImpl
(
const
std
::
string
&
path
,
Stat
*
stat_
,
WatchCallback
watch_callback
);
int32_t
existsImpl
(
const
std
::
string
&
path
,
Stat
*
stat_
,
WatchCallback
watch_callback
);
MultiFuture
asyncMultiImpl
(
const
zkutil
::
Ops
&
ops_
,
bool
throw_exception
);
std
::
string
hosts
;
std
::
string
hosts
;
int32_t
session_timeout_ms
;
int32_t
session_timeout_ms
;
...
...
dbms/src/Common/ZooKeeper/tests/CMakeLists.txt
浏览文件 @
351a0905
...
@@ -17,4 +17,4 @@ add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
...
@@ -17,4 +17,4 @@ add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries
(
zk_many_watches_reconnect dbms
)
target_link_libraries
(
zk_many_watches_reconnect dbms
)
add_executable
(
zkutil_test_multi_exception zkutil_test_multi_exception.cpp
)
add_executable
(
zkutil_test_multi_exception zkutil_test_multi_exception.cpp
)
target_link_libraries
(
zkutil_test_multi_exception dbms
)
target_link_libraries
(
zkutil_test_multi_exception dbms
gtest_main
)
dbms/src/Common/ZooKeeper/tests/zkutil_test_multi_exception.cpp
浏览文件 @
351a0905
#include <iostream>
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Exception.h>
#include <gtest/gtest.h>
using
namespace
DB
;
using
namespace
DB
;
int
main
(
)
TEST
(
zkutil
,
multi_nice_exception_msg
)
{
{
auto
zookeeper
=
std
::
make_unique
<
zkutil
::
ZooKeeper
>
(
"localhost:2181"
);
auto
zookeeper
=
std
::
make_unique
<
zkutil
::
ZooKeeper
>
(
"localhost:2181"
);
try
auto
acl
=
zookeeper
->
getDefaultACL
();
{
zkutil
::
Ops
ops
;
auto
acl
=
zookeeper
->
getDefaultACL
();
zkutil
::
Ops
ops
;
ASSERT_NO_THROW
(
zookeeper
->
tryRemoveRecursive
(
"/clickhouse_test_zkutil_multi"
);
zookeeper
->
tryRemoveRecursive
(
"/clickhouse_test_zkutil_multi"
);
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
);
try
{
ops
.
clear
();
ops
.
clear
();
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/c"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/c"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Remove
(
"/clickhouse_test_zkutil_multi/c"
,
-
1
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Remove
(
"/clickhouse_test_zkutil_multi/c"
,
-
1
));
...
@@ -27,6 +30,7 @@ int main()
...
@@ -27,6 +30,7 @@ int main()
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
FAIL
();
}
}
catch
(...)
catch
(...)
{
{
...
@@ -34,16 +38,47 @@ int main()
...
@@ -34,16 +38,47 @@ int main()
String
msg
=
getCurrentExceptionMessage
(
false
);
String
msg
=
getCurrentExceptionMessage
(
false
);
if
(
msg
.
find
(
"/clickhouse_test_zkutil_multi/a"
)
==
std
::
string
::
npos
||
msg
.
find
(
"#2"
)
==
std
::
string
::
npos
)
bool
msg_has_reqired_patterns
=
msg
.
find
(
"/clickhouse_test_zkutil_multi/a"
)
!=
std
::
string
::
npos
&&
msg
.
find
(
"#2"
)
!=
std
::
string
::
npos
;
{
EXPECT_TRUE
(
msg_has_reqired_patterns
)
<<
msg
;
std
::
cerr
<<
"Wrong: "
<<
msg
;
}
return
-
1
;
}
}
std
::
cout
<<
"Ok: "
<<
msg
;
return
0
;
TEST
(
zkutil
,
multi_async
)
{
auto
zookeeper
=
std
::
make_unique
<
zkutil
::
ZooKeeper
>
(
"localhost:2181"
);
auto
acl
=
zookeeper
->
getDefaultACL
();
zkutil
::
Ops
ops
;
zookeeper
->
tryRemoveRecursive
(
"/clickhouse_test_zkutil_multi"
);
{
ops
.
clear
();
auto
fut
=
zookeeper
->
asyncMulti
(
ops
);
}
}
std
::
cerr
<<
"Unexpected"
;
{
return
-
1
;
ops
.
clear
();
}
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
auto
fut
=
zookeeper
->
tryAsyncMulti
(
ops
);
ops
.
clear
();
auto
res
=
fut
.
get
();
ASSERT_TRUE
(
res
.
code
==
ZOK
);
}
{
ops
.
clear
();
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
emplace_back
(
new
zkutil
::
Op
::
Create
(
"/clickhouse_test_zkutil_multi/a"
,
"_"
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
auto
fut
=
zookeeper
->
tryAsyncMulti
(
ops
);
ops
.
clear
();
auto
res
=
fut
.
get
();
ASSERT_TRUE
(
res
.
code
==
ZNODEEXISTS
);
ASSERT_EQ
(
res
.
results
->
size
(),
2
);
}
}
\ No newline at end of file
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
浏览文件 @
351a0905
...
@@ -16,7 +16,8 @@ namespace ErrorCodes
...
@@ -16,7 +16,8 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread
::
ReplicatedMergeTreeCleanupThread
(
StorageReplicatedMergeTree
&
storage_
)
ReplicatedMergeTreeCleanupThread
::
ReplicatedMergeTreeCleanupThread
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
),
:
storage
(
storage_
),
log
(
&
Logger
::
get
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (StorageReplicatedMergeTree, CleanupThread)"
)),
log
(
&
Logger
::
get
(
storage
.
database_name
+
"."
+
storage
.
table_name
+
" (StorageReplicatedMergeTree, CleanupThread)"
)),
thread
([
this
]
{
run
();
})
{}
thread
([
this
]
{
run
();
}),
cached_block_stats
(
std
::
make_unique
<
NodesStatCache
>
())
{}
void
ReplicatedMergeTreeCleanupThread
::
run
()
void
ReplicatedMergeTreeCleanupThread
::
run
()
...
@@ -108,11 +109,117 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
...
@@ -108,11 +109,117 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
LOG_DEBUG
(
log
,
"Removed "
<<
entries
.
size
()
<<
" old log entries: "
<<
entries
.
front
()
<<
" - "
<<
entries
.
back
());
LOG_DEBUG
(
log
,
"Removed "
<<
entries
.
size
()
<<
" old log entries: "
<<
entries
.
front
()
<<
" - "
<<
entries
.
back
());
}
}
namespace
{
struct
RequiredStat
{
Int64
ctime
;
int
numChildren
;
RequiredStat
()
=
default
;
RequiredStat
(
const
RequiredStat
&
)
=
default
;
explicit
RequiredStat
(
const
zkutil
::
Stat
&
s
)
:
ctime
(
s
.
ctime
),
numChildren
(
s
.
numChildren
)
{};
explicit
RequiredStat
(
Int64
ctime_
)
:
ctime
(
ctime_
)
{}
};
}
class
ReplicatedMergeTreeCleanupThread
::
NodesStatCache
:
public
std
::
map
<
String
,
RequiredStat
>
{
};
struct
ReplicatedMergeTreeCleanupThread
::
NodeWithStat
{
String
node
;
RequiredStat
stat
;
NodeWithStat
()
=
default
;
NodeWithStat
(
const
String
&
node_
,
const
RequiredStat
&
stat_
)
:
node
(
node_
),
stat
(
stat_
)
{}
static
bool
greaterByTime
(
const
NodeWithStat
&
lhs
,
const
NodeWithStat
&
rhs
)
{
return
(
lhs
.
stat
.
ctime
!=
rhs
.
stat
.
ctime
)
?
lhs
.
stat
.
ctime
>
rhs
.
stat
.
ctime
:
lhs
.
node
>
rhs
.
node
;
}
};
void
ReplicatedMergeTreeCleanupThread
::
clearOldBlocks
()
void
ReplicatedMergeTreeCleanupThread
::
clearOldBlocks
()
{
{
auto
zookeeper
=
storage
.
getZooKeeper
();
auto
zookeeper
=
storage
.
getZooKeeper
();
std
::
vector
<
NodeWithStat
>
timed_blocks
;
getBlocksSortedByTime
(
zookeeper
,
timed_blocks
);
if
(
timed_blocks
.
empty
())
return
;
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64
current_time
=
timed_blocks
.
front
().
stat
.
ctime
;
Int64
time_threshold
=
std
::
max
(
0L
,
current_time
-
static_cast
<
Int64
>
(
storage
.
data
.
settings
.
replicated_deduplication_window_seconds
));
NodeWithStat
block_threshold
(
""
,
RequiredStat
(
time_threshold
));
size_t
current_deduplication_window
=
std
::
min
(
timed_blocks
.
size
(),
storage
.
data
.
settings
.
replicated_deduplication_window
);
auto
first_outdated_block_fixed_threshold
=
timed_blocks
.
begin
()
+
current_deduplication_window
;
auto
first_outdated_block_time_threshold
=
std
::
upper_bound
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
block_threshold
,
NodeWithStat
::
greaterByTime
);
auto
first_outdated_block
=
std
::
min
(
first_outdated_block_fixed_threshold
,
first_outdated_block_time_threshold
);
/// TODO After about half a year, we could remain only multi op, because there will be no obsolete children nodes.
std
::
vector
<
zkutil
::
ZooKeeper
::
MultiFuture
>
multi_futures
;
zkutil
::
Ops
ops
;
for
(
auto
it
=
first_outdated_block
;
it
!=
timed_blocks
.
end
();
++
it
)
{
String
path
=
storage
.
zookeeper_path
+
"/blocks/"
+
it
->
node
;
if
(
it
->
stat
.
numChildren
==
0
)
{
ops
.
emplace_back
(
new
zkutil
::
Op
::
Remove
(
path
,
-
1
));
if
(
ops
.
size
()
>=
zkutil
::
MULTI_BATCH_SIZE
)
{
multi_futures
.
emplace_back
(
zookeeper
->
tryAsyncMulti
(
ops
));
ops
.
clear
();
}
}
else
zookeeper
->
removeRecursive
(
path
);
}
if
(
!
ops
.
empty
())
{
multi_futures
.
emplace_back
(
zookeeper
->
tryAsyncMulti
(
ops
));
ops
.
clear
();
}
auto
num_nodes_to_delete
=
timed_blocks
.
end
()
-
first_outdated_block
;
size_t
num_nodes_not_deleted
=
0
;
int
last_error_code
=
ZOK
;
for
(
auto
&
future
:
multi_futures
)
{
auto
res
=
future
.
get
();
if
(
res
.
code
!=
ZOK
)
{
num_nodes_not_deleted
+=
res
.
results
->
size
();
last_error_code
=
res
.
code
;
}
}
if
(
num_nodes_not_deleted
)
{
LOG_ERROR
(
log
,
"There was a problem with deleting "
<<
num_nodes_not_deleted
<<
" (from "
<<
num_nodes_to_delete
<<
")"
<<
" old blocks from ZooKeeper, error: "
<<
zkutil
::
ZooKeeper
::
error2string
(
last_error_code
));
}
else
LOG_TRACE
(
log
,
"Cleared "
<<
num_nodes_to_delete
<<
" old blocks from ZooKeeper"
);
}
void
ReplicatedMergeTreeCleanupThread
::
getBlocksSortedByTime
(
zkutil
::
ZooKeeperPtr
&
zookeeper
,
std
::
vector
<
NodeWithStat
>
&
timed_blocks
)
{
timed_blocks
.
clear
();
Strings
blocks
;
Strings
blocks
;
zkutil
::
Stat
stat
;
zkutil
::
Stat
stat
;
if
(
ZOK
!=
zookeeper
->
tryGetChildren
(
storage
.
zookeeper_path
+
"/blocks"
,
blocks
,
&
stat
))
if
(
ZOK
!=
zookeeper
->
tryGetChildren
(
storage
.
zookeeper_path
+
"/blocks"
,
blocks
,
&
stat
))
...
@@ -121,66 +228,54 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
...
@@ -121,66 +228,54 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Clear already deleted blocks from the cache, cached_block_ctime should be subset of blocks
/// Clear already deleted blocks from the cache, cached_block_ctime should be subset of blocks
{
{
NameSet
blocks_set
(
blocks
.
begin
(),
blocks
.
end
());
NameSet
blocks_set
(
blocks
.
begin
(),
blocks
.
end
());
for
(
auto
it
=
cached_block_
ctime
.
begin
();
it
!=
cached_block_ctime
.
end
();)
for
(
auto
it
=
cached_block_
stats
->
begin
();
it
!=
cached_block_stats
->
end
();)
{
{
if
(
!
blocks_set
.
count
(
it
->
first
))
if
(
!
blocks_set
.
count
(
it
->
first
))
it
=
cached_block_
ctime
.
erase
(
it
);
it
=
cached_block_
stats
->
erase
(
it
);
else
else
++
it
;
++
it
;
}
}
}
}
auto
not_cached_blocks
=
stat
.
numChildren
-
cached_block_
ctime
.
size
();
auto
not_cached_blocks
=
stat
.
numChildren
-
cached_block_
stats
->
size
();
LOG_TRACE
(
log
,
"Checking "
<<
stat
.
numChildren
<<
" blocks ("
<<
not_cached_blocks
<<
" are not cached)"
LOG_TRACE
(
log
,
"Checking "
<<
stat
.
numChildren
<<
" blocks ("
<<
not_cached_blocks
<<
" are not cached)"
<<
" to clear old ones from ZooKeeper. This might take several minutes."
);
<<
" to clear old ones from ZooKeeper. This might take several minutes."
);
/// Time -> block hash from ZooKeeper (from node name)
std
::
vector
<
std
::
pair
<
String
,
zkutil
::
ZooKeeper
::
ExistsFuture
>>
exists_futures
;
using
TimedBlock
=
std
::
pair
<
Int64
,
String
>
;
using
TimedBlocksComparator
=
std
::
greater
<
TimedBlock
>
;
std
::
vector
<
TimedBlock
>
timed_blocks
;
for
(
const
String
&
block
:
blocks
)
for
(
const
String
&
block
:
blocks
)
{
{
auto
it
=
cached_block_ctime
.
find
(
block
);
auto
it
=
cached_block_stats
->
find
(
block
);
if
(
it
==
cached_block_stats
->
end
())
if
(
it
==
cached_block_ctime
.
end
())
{
{
/// New block. Fetch its stat and put it into the cache
/// New block. Fetch its stat stat asynchronously
zkutil
::
Stat
block_stat
;
exists_futures
.
emplace_back
(
block
,
zookeeper
->
asyncExists
(
storage
.
zookeeper_path
+
"/blocks/"
+
block
));
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/blocks/"
+
block
,
&
block_stat
);
cached_block_ctime
.
emplace
(
block
,
block_stat
.
ctime
);
timed_blocks
.
emplace_back
(
block_stat
.
ctime
,
block
);
}
}
else
else
{
{
/// Cached block
/// Cached block
timed_blocks
.
emplace_back
(
it
->
second
,
block
);
timed_blocks
.
emplace_back
(
block
,
it
->
second
);
}
}
}
}
if
(
timed_blocks
.
empty
())
/// Put fetched stats into the cache
return
;
for
(
auto
&
elem
:
exists_futures
)
{
std
::
sort
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
TimedBlocksComparator
());
zkutil
::
ZooKeeper
::
StatAndExists
status
=
elem
.
second
.
get
();
if
(
!
status
.
exists
)
throw
zkutil
::
KeeperException
(
"A block node was suddenly deleted"
,
ZNONODE
);
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
cached_block_stats
->
emplace
(
elem
.
first
,
status
.
stat
);
Int64
current_time
=
timed_blocks
.
front
().
first
;
timed_blocks
.
emplace_back
(
elem
.
first
,
RequiredStat
(
status
.
stat
));
Int64
time_threshold
=
std
::
max
(
0L
,
current_time
-
static_cast
<
Int64
>
(
storage
.
data
.
settings
.
replicated_deduplication_window_seconds
));
}
TimedBlock
block_threshold
(
time_threshold
,
""
);
size_t
current_deduplication_window
=
std
::
min
(
timed_blocks
.
size
(),
storage
.
data
.
settings
.
replicated_deduplication_window
);
std
::
sort
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
NodeWithStat
::
greaterByTime
);
auto
first_outdated_block_fixed_threshold
=
timed_blocks
.
begin
()
+
current_deduplication_window
;
}
auto
first_outdated_block_time_threshold
=
std
::
upper_bound
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
block_threshold
,
TimedBlocksComparator
());
auto
first_outdated_block
=
std
::
min
(
first_outdated_block_fixed_threshold
,
first_outdated_block_time_threshold
);
for
(
auto
it
=
first_outdated_block
;
it
!=
timed_blocks
.
end
();
++
it
)
{
/// TODO After about half a year, we could replace this to multi op, because there will be no obsolete children nodes.
zookeeper
->
removeRecursive
(
storage
.
zookeeper_path
+
"/blocks/"
+
it
->
second
);
cached_block_ctime
.
erase
(
it
->
second
);
}
LOG_TRACE
(
log
,
"Cleared "
<<
timed_blocks
.
end
()
-
first_outdated_block
<<
" old blocks from ZooKeeper"
);
ReplicatedMergeTreeCleanupThread
::~
ReplicatedMergeTreeCleanupThread
()
{
if
(
thread
.
joinable
())
thread
.
join
();
}
}
}
}
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h
浏览文件 @
351a0905
#pragma once
#pragma once
#include <Core/Types.h>
#include <Core/Types.h>
#include <Common/ZooKeeper/Types.h>
#include <common/logger_useful.h>
#include <common/logger_useful.h>
#include <thread>
#include <thread>
#include <map>
#include <map>
...
@@ -19,11 +20,7 @@ class ReplicatedMergeTreeCleanupThread
...
@@ -19,11 +20,7 @@ class ReplicatedMergeTreeCleanupThread
public:
public:
ReplicatedMergeTreeCleanupThread
(
StorageReplicatedMergeTree
&
storage_
);
ReplicatedMergeTreeCleanupThread
(
StorageReplicatedMergeTree
&
storage_
);
~
ReplicatedMergeTreeCleanupThread
()
~
ReplicatedMergeTreeCleanupThread
();
{
if
(
thread
.
joinable
())
thread
.
join
();
}
private:
private:
StorageReplicatedMergeTree
&
storage
;
StorageReplicatedMergeTree
&
storage
;
...
@@ -39,7 +36,12 @@ private:
...
@@ -39,7 +36,12 @@ private:
/// Remove old block hashes from ZooKeeper. This makes a leading replica.
/// Remove old block hashes from ZooKeeper. This makes a leading replica.
void
clearOldBlocks
();
void
clearOldBlocks
();
std
::
map
<
String
,
Int64
>
cached_block_ctime
;
class
NodesStatCache
;
struct
NodeWithStat
;
std
::
unique_ptr
<
NodesStatCache
>
cached_block_stats
;
/// Returns list of blocks with stat sorted by ctime
void
getBlocksSortedByTime
(
std
::
shared_ptr
<
zkutil
::
ZooKeeper
>
&
zookeeper
,
std
::
vector
<
NodeWithStat
>
&
timed_blocks
);
/// TODO Removing old quorum/failed_parts
/// TODO Removing old quorum/failed_parts
/// TODO Removing old nonincrement_block_numbers
/// TODO Removing old nonincrement_block_numbers
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
351a0905
...
@@ -833,13 +833,11 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
...
@@ -833,13 +833,11 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
}
}
/// Remove from ZK information about the parts covered by the newly added ones.
/// Remove from ZK information about the parts covered by the newly added ones.
for
(
const
String
&
name
:
expected_parts
)
{
{
LOG_ERROR
(
log
,
"Removing unexpectedly merged local part from ZooKeeper: "
<<
name
);
for
(
const
String
&
name
:
expected_parts
)
LOG_ERROR
(
log
,
"Removing unexpectedly merged local part from ZooKeeper: "
<<
name
);
zkutil
::
Ops
ops
;
removePartsFromZooKeeper
(
zookeeper
,
Strings
(
expected_parts
.
begin
(),
expected_parts
.
end
()));
removePossiblyIncompletePartNodeFromZooKeeper
(
name
,
ops
,
zookeeper
);
zookeeper
->
multi
(
ops
);
}
}
/// Add to the queue job to pick up the missing parts from other replicas and remove from ZK the information that we have them.
/// Add to the queue job to pick up the missing parts from other replicas and remove from ZK the information that we have them.
...
@@ -855,7 +853,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
...
@@ -855,7 +853,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// We assume that this occurs before the queue is loaded (queue.initialize).
/// We assume that this occurs before the queue is loaded (queue.initialize).
zkutil
::
Ops
ops
;
zkutil
::
Ops
ops
;
removeP
ossiblyIncompletePartNodeFromZooKeeper
(
name
,
ops
,
zookeeper
);
removeP
artFromZooKeeper
(
name
,
ops
);
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Create
>
(
replica_path
+
"/queue/queue-"
,
log_entry
.
toString
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
PersistentSequential
));
replica_path
+
"/queue/queue-"
,
log_entry
.
toString
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
PersistentSequential
));
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
...
@@ -1879,25 +1877,6 @@ void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_nam
...
@@ -1879,25 +1877,6 @@ void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_nam
}
}
/// Workarond for known ZooKeeper problem, see CLICKHOUSE-3040 and ZOOKEEPER-2362
/// Multi operation was non-atomic on special wrongly-patched version of ZooKeeper
/// (occasionally used in AdFox) in case of exceeded quota.
void
StorageReplicatedMergeTree
::
removePossiblyIncompletePartNodeFromZooKeeper
(
const
String
&
part_name
,
zkutil
::
Ops
&
ops
,
const
zkutil
::
ZooKeeperPtr
&
zookeeper
)
{
String
part_path
=
replica_path
+
"/parts/"
+
part_name
;
Names
children_
=
zookeeper
->
getChildren
(
part_path
);
NameSet
children
(
children_
.
begin
(),
children_
.
end
());
if
(
children
.
size
()
!=
2
)
LOG_WARNING
(
log
,
"Will remove incomplete part node "
<<
part_path
<<
" from ZooKeeper"
);
if
(
children
.
count
(
"checksums"
))
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
part_path
+
"/checksums"
,
-
1
));
if
(
children
.
count
(
"columns"
))
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
part_path
+
"/columns"
,
-
1
));
ops
.
emplace_back
(
std
::
make_unique
<
zkutil
::
Op
::
Remove
>
(
part_path
,
-
1
));
}
void
StorageReplicatedMergeTree
::
removePartAndEnqueueFetch
(
const
String
&
part_name
)
void
StorageReplicatedMergeTree
::
removePartAndEnqueueFetch
(
const
String
&
part_name
)
{
{
auto
zookeeper
=
getZooKeeper
();
auto
zookeeper
=
getZooKeeper
();
...
@@ -3812,26 +3791,27 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
...
@@ -3812,26 +3791,27 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
try
try
{
{
LOG_DEBUG
(
log
,
"Removing "
<<
parts
.
size
()
<<
" old parts from file system"
);
Strings
part_names
;
while
(
!
parts
.
empty
())
while
(
!
parts
.
empty
())
{
{
MergeTreeData
::
DataPartPtr
&
part
=
parts
.
back
();
MergeTreeData
::
DataPartPtr
&
part
=
parts
.
back
();
LOG_DEBUG
(
log
,
"Removing "
<<
part
->
name
);
try
{
zkutil
::
Ops
ops
;
removePossiblyIncompletePartNodeFromZooKeeper
(
part
->
name
,
ops
,
zookeeper
);
zookeeper
->
multi
(
ops
);
}
catch
(
const
zkutil
::
KeeperException
&
e
)
{
LOG_WARNING
(
log
,
"Couldn't remove "
<<
part
->
name
<<
" from ZooKeeper: "
<<
zkutil
::
ZooKeeper
::
error2string
(
e
.
code
));
}
part
->
remove
();
part
->
remove
();
part_names
.
emplace_back
(
part
->
name
);
parts
.
pop_back
();
parts
.
pop_back
();
}
}
LOG_DEBUG
(
log
,
"Removed "
<<
part_names
.
size
()
<<
" old parts from file system. Removing them from ZooKeeper."
);
try
{
removePartsFromZooKeeper
(
zookeeper
,
part_names
);
}
catch
(
const
zkutil
::
KeeperException
&
e
)
{
LOG_ERROR
(
log
,
"There is a problem with deleting parts from ZooKeeper: "
<<
getCurrentExceptionMessage
(
false
));
}
}
}
catch
(...)
catch
(...)
{
{
...
@@ -3844,4 +3824,33 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
...
@@ -3844,4 +3824,33 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
}
}
void
StorageReplicatedMergeTree
::
removePartsFromZooKeeper
(
zkutil
::
ZooKeeperPtr
&
zookeeper
,
const
Strings
&
part_names
)
{
zkutil
::
Ops
ops
;
std
::
vector
<
zkutil
::
ZooKeeper
::
MultiFuture
>
futures
;
for
(
auto
it
=
part_names
.
cbegin
();
it
!=
part_names
.
cend
();
++
it
)
{
removePartFromZooKeeper
(
*
it
,
ops
);
if
(
ops
.
size
()
>=
zkutil
::
MULTI_BATCH_SIZE
||
next
(
it
)
==
part_names
.
cend
())
{
futures
.
emplace_back
(
zookeeper
->
tryAsyncMulti
(
ops
));
ops
.
clear
();
}
}
int
last_error_code
=
ZOK
;
for
(
auto
&
future
:
futures
)
{
auto
res
=
future
.
get
();
if
(
res
.
code
!=
ZOK
)
last_error_code
=
res
.
code
;
}
if
(
last_error_code
!=
ZOK
)
throw
zkutil
::
KeeperException
(
last_error_code
);
}
}
}
dbms/src/Storages/StorageReplicatedMergeTree.h
浏览文件 @
351a0905
...
@@ -374,9 +374,8 @@ private:
...
@@ -374,9 +374,8 @@ private:
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Adds actions to `ops` that remove a part from ZooKeeper.
void
removePartFromZooKeeper
(
const
String
&
part_name
,
zkutil
::
Ops
&
ops
);
void
removePartFromZooKeeper
(
const
String
&
part_name
,
zkutil
::
Ops
&
ops
);
/// Like removePartFromZooKeeper, but handles absence of some nodes and remove other nodes anyway, see CLICKHOUSE-3040
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
/// Use it only in non-critical places for cleaning.
void
removePartsFromZooKeeper
(
zkutil
::
ZooKeeperPtr
&
zookeeper
,
const
Strings
&
part_names
);
void
removePossiblyIncompletePartNodeFromZooKeeper
(
const
String
&
part_name
,
zkutil
::
Ops
&
ops
,
const
zkutil
::
ZooKeeperPtr
&
zookeeper
);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void
removePartAndEnqueueFetch
(
const
String
&
part_name
);
void
removePartAndEnqueueFetch
(
const
String
&
part_name
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录