Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
09076b30
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
09076b30
编写于
12月 12, 2014
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Merge
上级
97ec6ecf
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
111 addition
and
39 deletion
+111
-39
dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
...Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
+13
-11
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
+15
-1
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
...c/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
+15
-10
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
...torages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
+12
-9
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+56
-8
未找到文件。
dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h
浏览文件 @
09076b30
...
...
@@ -22,12 +22,14 @@ public:
void
write
(
const
Block
&
block
)
override
{
assertSessionIsNotExpired
();
auto
zookeeper
=
storage
.
getZooKeeper
();
assertSessionIsNotExpired
(
zookeeper
);
auto
part_blocks
=
storage
.
writer
.
splitBlockIntoParts
(
block
);
for
(
auto
&
current_block
:
part_blocks
)
{
assertSessionIsNotExpired
();
assertSessionIsNotExpired
(
zookeeper
);
++
block_index
;
String
block_id
=
insert_id
.
empty
()
?
""
:
insert_id
+
"__"
+
toString
(
block_index
);
...
...
@@ -60,29 +62,29 @@ public:
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
zookeeper_path
+
"/blocks/"
+
block_id
,
""
,
storage
.
zookeeper
->
getDefaultACL
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
zookeeper_path
+
"/blocks/"
+
block_id
+
"/columns"
,
part
->
columns
.
toString
(),
storage
.
zookeeper
->
getDefaultACL
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
zookeeper_path
+
"/blocks/"
+
block_id
+
"/checksums"
,
part
->
checksums
.
toString
(),
storage
.
zookeeper
->
getDefaultACL
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
zookeeper_path
+
"/blocks/"
+
block_id
+
"/number"
,
toString
(
part_number
),
storage
.
zookeeper
->
getDefaultACL
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Persistent
));
}
storage
.
checkPartAndAddToZooKeeper
(
part
,
ops
,
part_name
);
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
zookeeper_path
+
"/log/log-"
,
log_entry
.
toString
(),
storage
.
zookeeper
->
getDefaultACL
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
PersistentSequential
));
block_number_lock
.
getUnlockOps
(
ops
);
...
...
@@ -91,7 +93,7 @@ public:
try
{
auto
code
=
storage
.
zookeeper
->
tryMulti
(
ops
);
auto
code
=
zookeeper
->
tryMulti
(
ops
);
if
(
code
==
ZOK
)
{
transaction
.
commit
();
...
...
@@ -101,7 +103,7 @@ public:
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String
expected_checksums_str
;
if
(
!
block_id
.
empty
()
&&
storage
.
zookeeper
->
tryGet
(
if
(
!
block_id
.
empty
()
&&
zookeeper
->
tryGet
(
storage
.
zookeeper_path
+
"/blocks/"
+
block_id
+
"/checksums"
,
expected_checksums_str
))
{
LOG_INFO
(
log
,
"Block with ID "
<<
block_id
<<
" already exists; ignoring it (removing part "
<<
part
->
name
<<
")"
);
...
...
@@ -149,9 +151,9 @@ private:
/// Позволяет проверить, что сессия в ZooKeeper ещё жива.
void
assertSessionIsNotExpired
()
void
assertSessionIsNotExpired
(
zkutil
::
ZooKeeperPtr
&
zookeeper
)
{
if
(
storage
.
zookeeper
->
expired
())
if
(
zookeeper
->
expired
())
throw
Exception
(
"ZooKeeper session has been expired."
,
ErrorCodes
::
NO_ZOOKEEPER
);
}
};
...
...
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
浏览文件 @
09076b30
...
...
@@ -146,7 +146,21 @@ private:
typedef
std
::
list
<
String
>
StringList
;
Context
&
context
;
zkutil
::
ZooKeeperPtr
zookeeper
;
zkutil
::
ZooKeeperPtr
current_zookeeper
;
/// Используйте только с помощью методов ниже.
std
::
mutex
current_zookeeper_mutex
;
/// Для пересоздания сессии в фоновом потоке.
zkutil
::
ZooKeeperPtr
getZooKeeper
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
current_zookeeper_mutex
);
return
current_zookeeper
;
}
void
setZooKeeper
(
zkutil
::
ZooKeeperPtr
zookeeper
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
current_zookeeper_mutex
);
current_zookeeper
=
zookeeper
;
}
/// Если true, таблица в офлайновом режиме, и в нее нельзя писать.
bool
is_readonly
=
false
;
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
浏览文件 @
09076b30
...
...
@@ -51,6 +51,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
void
ReplicatedMergeTreeCleanupThread
::
clearOldParts
()
{
auto
table_lock
=
storage
.
lockStructure
(
false
);
auto
zookeeper
=
storage
.
getZooKeeper
();
MergeTreeData
::
DataPartsVector
parts
=
storage
.
data
.
grabOldParts
();
size_t
count
=
parts
.
size
();
...
...
@@ -73,7 +74,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldParts()
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
storage
.
replica_path
+
"/parts/"
+
part
->
name
+
"/columns"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
storage
.
replica_path
+
"/parts/"
+
part
->
name
+
"/checksums"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
storage
.
replica_path
+
"/parts/"
+
part
->
name
,
-
1
));
auto
code
=
storage
.
zookeeper
->
tryMulti
(
ops
);
auto
code
=
zookeeper
->
tryMulti
(
ops
);
if
(
code
!=
ZOK
)
LOG_WARNING
(
log
,
"Couldn't remove "
<<
part
->
name
<<
" from ZooKeeper: "
<<
zkutil
::
ZooKeeper
::
error2string
(
code
));
...
...
@@ -94,8 +95,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldParts()
void
ReplicatedMergeTreeCleanupThread
::
clearOldLogs
()
{
auto
zookeeper
=
storage
.
getZooKeeper
();
zkutil
::
Stat
stat
;
if
(
!
storage
.
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/log"
,
&
stat
))
if
(
!
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/log"
,
&
stat
))
throw
Exception
(
storage
.
zookeeper_path
+
"/log doesn't exist"
,
ErrorCodes
::
NOT_FOUND_NODE
);
int
children_count
=
stat
.
numChildren
;
...
...
@@ -104,17 +107,17 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
if
(
static_cast
<
double
>
(
children_count
)
<
storage
.
data
.
settings
.
replicated_logs_to_keep
*
1.1
)
return
;
Strings
replicas
=
storage
.
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/replicas"
,
&
stat
);
Strings
replicas
=
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/replicas"
,
&
stat
);
UInt64
min_pointer
=
std
::
numeric_limits
<
UInt64
>::
max
();
for
(
const
String
&
replica
:
replicas
)
{
String
pointer
=
storage
.
zookeeper
->
get
(
storage
.
zookeeper_path
+
"/replicas/"
+
replica
+
"/log_pointer"
);
String
pointer
=
zookeeper
->
get
(
storage
.
zookeeper_path
+
"/replicas/"
+
replica
+
"/log_pointer"
);
if
(
pointer
.
empty
())
return
;
min_pointer
=
std
::
min
(
min_pointer
,
parse
<
UInt64
>
(
pointer
));
}
Strings
entries
=
storage
.
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/log"
);
Strings
entries
=
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/log"
);
std
::
sort
(
entries
.
begin
(),
entries
.
end
());
/// Не будем трогать последние replicated_logs_to_keep записей.
...
...
@@ -134,7 +137,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
ops
.
push_back
(
new
zkutil
::
Op
::
Check
(
storage
.
zookeeper_path
+
"/replicas"
,
stat
.
version
));
storage
.
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
ops
.
clear
();
}
}
...
...
@@ -145,8 +148,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
void
ReplicatedMergeTreeCleanupThread
::
clearOldBlocks
()
{
auto
zookeeper
=
storage
.
getZooKeeper
();
zkutil
::
Stat
stat
;
if
(
!
storage
.
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/blocks"
,
&
stat
))
if
(
!
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/blocks"
,
&
stat
))
throw
Exception
(
storage
.
zookeeper_path
+
"/blocks doesn't exist"
,
ErrorCodes
::
NOT_FOUND_NODE
);
int
children_count
=
stat
.
numChildren
;
...
...
@@ -158,14 +163,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
LOG_TRACE
(
log
,
"Clearing about "
<<
static_cast
<
size_t
>
(
children_count
)
-
storage
.
data
.
settings
.
replicated_deduplication_window
<<
" old blocks from ZooKeeper. This might take several minutes."
);
Strings
blocks
=
storage
.
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/blocks"
);
Strings
blocks
=
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/blocks"
);
std
::
vector
<
std
::
pair
<
Int64
,
String
>
>
timed_blocks
;
for
(
const
String
&
block
:
blocks
)
{
zkutil
::
Stat
stat
;
storage
.
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/blocks/"
+
block
,
&
stat
);
zookeeper
->
exists
(
storage
.
zookeeper_path
+
"/blocks/"
+
block
,
&
stat
);
timed_blocks
.
push_back
(
std
::
make_pair
(
stat
.
czxid
,
block
));
}
...
...
@@ -180,7 +185,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
if
(
ops
.
size
()
>
400
||
i
+
1
==
timed_blocks
.
size
())
{
storage
.
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
ops
.
clear
();
}
}
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
浏览文件 @
09076b30
...
...
@@ -38,7 +38,7 @@ void ReplicatedMergeTreeRestartingThread::run()
/// Запуск реплики при старте сервера/создании таблицы. Перезапуск реплики при истечении сессии с ZK.
while
(
!
need_stop
)
{
if
(
first_time
||
storage
.
zookeeper
->
expired
())
if
(
first_time
||
storage
.
getZooKeeper
()
->
expired
())
{
if
(
first_time
)
{
...
...
@@ -56,8 +56,7 @@ void ReplicatedMergeTreeRestartingThread::run()
{
try
{
/// TODO race condition при присваивании?
storage
.
zookeeper
=
storage
.
context
.
getZooKeeper
();
storage
.
setZooKeeper
(
storage
.
context
.
getZooKeeper
());
}
catch
(
const
zkutil
::
KeeperException
&
e
)
{
...
...
@@ -112,7 +111,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage
.
leader_election
=
new
zkutil
::
LeaderElection
(
storage
.
zookeeper_path
+
"/leader_election"
,
*
storage
.
zookeeper
,
*
storage
.
current_zookeeper
,
/// current_zookeeper живёт в течение времени жизни leader_election,
/// так как до изменения current_zookeeper, объект leader_election уничтожается в методе partialShutdown.
[
this
]
{
storage
.
becomeLeader
();
},
storage
.
replica_name
);
...
...
@@ -166,6 +166,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
void
ReplicatedMergeTreeRestartingThread
::
activateReplica
()
{
auto
host_port
=
storage
.
context
.
getInterserverIOAddress
();
auto
zookeeper
=
storage
.
getZooKeeper
();
std
::
string
address
;
{
...
...
@@ -181,18 +182,18 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
* но он крайне маловероятен при нормальном использовании.
*/
String
data
;
if
(
storage
.
zookeeper
->
tryGet
(
storage
.
replica_path
+
"/is_active"
,
data
)
&&
data
==
active_node_identifier
)
storage
.
zookeeper
->
tryRemove
(
storage
.
replica_path
+
"/is_active"
);
if
(
zookeeper
->
tryGet
(
storage
.
replica_path
+
"/is_active"
,
data
)
&&
data
==
active_node_identifier
)
zookeeper
->
tryRemove
(
storage
.
replica_path
+
"/is_active"
);
/// Одновременно объявим, что эта реплика активна, и обновим хост.
zkutil
::
Ops
ops
;
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
storage
.
replica_path
+
"/is_active"
,
active_node_identifier
,
storage
.
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Ephemeral
));
active_node_identifier
,
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
Ephemeral
));
ops
.
push_back
(
new
zkutil
::
Op
::
SetData
(
storage
.
replica_path
+
"/host"
,
address
,
-
1
));
try
{
storage
.
zookeeper
->
multi
(
ops
);
zookeeper
->
multi
(
ops
);
}
catch
(
const
zkutil
::
KeeperException
&
e
)
{
...
...
@@ -203,7 +204,9 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
throw
;
}
storage
.
replica_is_active_node
=
zkutil
::
EphemeralNodeHolder
::
existing
(
storage
.
replica_path
+
"/is_active"
,
*
storage
.
zookeeper
);
/// current_zookeeper живёт в течение времени жизни replica_is_active_node,
/// так как до изменения current_zookeeper, объект replica_is_active_node уничтожается в методе partialShutdown.
storage
.
replica_is_active_node
=
zkutil
::
EphemeralNodeHolder
::
existing
(
storage
.
replica_path
+
"/is_active"
,
*
storage
.
current_zookeeper
);
}
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
09076b30
...
...
@@ -39,7 +39,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const
Names
&
columns_to_sum_
,
const
MergeTreeSettings
&
settings_
)
:
IStorage
{
materialized_columns_
,
alias_columns_
,
column_defaults_
},
context
(
context_
),
zookeeper
(
context
.
getZooKeeper
()),
database_name
(
database_name_
),
current_
zookeeper
(
context
.
getZooKeeper
()),
database_name
(
database_name_
),
table_name
(
name_
),
full_path
(
path_
+
escapeForFileName
(
table_name
)
+
'/'
),
zookeeper_path
(
context
.
getMacros
().
expand
(
zookeeper_path_
)),
replica_name
(
context
.
getMacros
().
expand
(
replica_name_
)),
...
...
@@ -61,10 +61,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
try
{
if
(
zookeeper
&&
zookeeper
->
exists
(
replica_path
+
"/flags/force_restore_data"
))
if
(
current_zookeeper
&&
current_
zookeeper
->
exists
(
replica_path
+
"/flags/force_restore_data"
))
{
skip_sanity_checks
=
true
;
zookeeper
->
remove
(
replica_path
+
"/flags/force_restore_data"
);
current_
zookeeper
->
remove
(
replica_path
+
"/flags/force_restore_data"
);
LOG_WARNING
(
log
,
"Skipping the limits on severity of changes to data parts and columns (flag "
<<
replica_path
<<
"/flags/force_restore_data)."
);
...
...
@@ -76,7 +76,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if
(
e
.
code
==
ZCONNECTIONLOSS
)
{
tryLogCurrentException
(
__PRETTY_FUNCTION__
);
zookeeper
=
nullptr
;
current_
zookeeper
=
nullptr
;
}
else
throw
;
...
...
@@ -84,7 +84,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data
.
loadDataParts
(
skip_sanity_checks
);
if
(
!
zookeeper
)
if
(
!
current_
zookeeper
)
{
if
(
!
attach
)
throw
Exception
(
"Can't create replicated table without ZooKeeper"
,
ErrorCodes
::
NO_ZOOKEEPER
);
...
...
@@ -157,11 +157,11 @@ StoragePtr StorageReplicatedMergeTree::create(
columns_
,
materialized_columns_
,
alias_columns_
,
column_defaults_
,
context_
,
primary_expr_ast_
,
date_column_name_
,
sampling_expression_
,
index_granularity_
,
mode_
,
sign_column_
,
columns_to_sum_
,
settings_
};
sign_column_
,
columns_to_sum_
,
settings_
};
StoragePtr
res_ptr
=
res
->
thisPtr
();
if
(
res
->
zookeeper
)
if
(
res
->
getZooKeeper
()
)
{
String
endpoint_name
=
"ReplicatedMergeTree:"
+
res
->
replica_path
;
InterserverIOEndpointPtr
endpoint
=
new
ReplicatedMergeTreePartsServer
(
res
->
data
,
*
res
);
...
...
@@ -184,6 +184,8 @@ static String formattedAST(const ASTPtr & ast)
void
StorageReplicatedMergeTree
::
createTableIfNotExists
()
{
auto
zookeeper
=
getZooKeeper
();
if
(
zookeeper
->
exists
(
zookeeper_path
))
return
;
...
...
@@ -236,6 +238,8 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
*/
void
StorageReplicatedMergeTree
::
checkTableStructure
(
bool
skip_sanity_checks
,
bool
allow_alter
)
{
auto
zookeeper
=
getZooKeeper
();
String
metadata_str
=
zookeeper
->
get
(
zookeeper_path
+
"/metadata"
);
ReadBufferFromString
buf
(
metadata_str
);
assertString
(
"metadata format version: 1"
,
buf
);
...
...
@@ -297,6 +301,8 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
void
StorageReplicatedMergeTree
::
createReplica
()
{
auto
zookeeper
=
getZooKeeper
();
LOG_DEBUG
(
log
,
"Creating replica "
<<
replica_path
);
/// Создадим пустую реплику. Ноду columns создадим в конце - будем использовать ее в качестве признака, что создание реплики завершено.
...
...
@@ -431,6 +437,8 @@ void StorageReplicatedMergeTree::createReplica()
void
StorageReplicatedMergeTree
::
checkParts
(
bool
skip_sanity_checks
)
{
auto
zookeeper
=
getZooKeeper
();
Strings
expected_parts_vec
=
zookeeper
->
getChildren
(
replica_path
+
"/parts"
);
/// Куски в ZK.
...
...
@@ -574,6 +582,8 @@ void StorageReplicatedMergeTree::initVirtualParts()
void
StorageReplicatedMergeTree
::
checkPartAndAddToZooKeeper
(
const
MergeTreeData
::
DataPartPtr
&
part
,
zkutil
::
Ops
&
ops
,
String
part_name
)
{
auto
zookeeper
=
getZooKeeper
();
if
(
part_name
.
empty
())
part_name
=
part
->
name
;
...
...
@@ -641,6 +651,8 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
void
StorageReplicatedMergeTree
::
loadQueue
()
{
auto
zookeeper
=
getZooKeeper
();
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mutex
);
Strings
children
=
zookeeper
->
getChildren
(
replica_path
+
"/queue"
);
...
...
@@ -660,6 +672,8 @@ void StorageReplicatedMergeTree::loadQueue()
void
StorageReplicatedMergeTree
::
pullLogsToQueue
(
zkutil
::
EventPtr
next_update_event
)
{
auto
zookeeper
=
getZooKeeper
();
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mutex
);
String
index_str
=
zookeeper
->
get
(
replica_path
+
"/log_pointer"
);
...
...
@@ -754,6 +768,8 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
bool
StorageReplicatedMergeTree
::
executeLogEntry
(
const
LogEntry
&
entry
,
BackgroundProcessingPool
::
Context
&
pool_context
)
{
auto
zookeeper
=
getZooKeeper
();
if
(
entry
.
type
==
LogEntry
::
DROP_RANGE
)
{
executeDropRange
(
entry
);
...
...
@@ -976,6 +992,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
void
StorageReplicatedMergeTree
::
executeDropRange
(
const
StorageReplicatedMergeTree
::
LogEntry
&
entry
)
{
auto
zookeeper
=
getZooKeeper
();
LOG_INFO
(
log
,
(
entry
.
detach
?
"Detaching"
:
"Removing"
)
<<
" parts inside "
<<
entry
.
new_part_name
<<
"."
);
{
...
...
@@ -1065,6 +1083,8 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
bool
StorageReplicatedMergeTree
::
executeAttachPart
(
const
StorageReplicatedMergeTree
::
LogEntry
&
entry
)
{
auto
zookeeper
=
getZooKeeper
();
String
source_path
=
(
entry
.
attach_unreplicated
?
"unreplicated/"
:
"detached/"
)
+
entry
.
source_part_name
;
LOG_INFO
(
log
,
"Attaching part "
<<
entry
.
source_part_name
<<
" from "
<<
source_path
<<
" as "
<<
entry
.
new_part_name
);
...
...
@@ -1177,6 +1197,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
{
if
(
executeLogEntry
(
*
entry
,
pool_context
))
{
auto
zookeeper
=
getZooKeeper
();
auto
code
=
zookeeper
->
tryRemove
(
replica_path
+
"/queue/"
+
entry
->
znode_name
);
if
(
code
!=
ZOK
)
...
...
@@ -1259,6 +1280,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
return
true
;
String
month_name
=
left
->
name
.
substr
(
0
,
6
);
auto
zookeeper
=
getZooKeeper
();
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for
(
UInt64
number
=
left
->
right
+
1
;
number
<=
right
->
left
-
1
;
++
number
)
/// Номера блоков больше нуля.
...
...
@@ -1340,6 +1362,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
do
{
auto
zookeeper
=
getZooKeeper
();
if
(
merges_queued
>=
data
.
settings
.
max_replicated_merges_in_queue
)
{
LOG_TRACE
(
log
,
"Number of queued merges is greater than max_replicated_merges_in_queue, so won't select new parts to merge."
);
...
...
@@ -1445,6 +1469,8 @@ void StorageReplicatedMergeTree::alterThread()
* TODO: Слишком сложно, всё переделать.
*/
auto
zookeeper
=
getZooKeeper
();
zkutil
::
Stat
stat
;
const
String
columns_str
=
zookeeper
->
get
(
zookeeper_path
+
"/columns"
,
&
stat
,
alter_thread_event
);
auto
columns_desc
=
ColumnsDescription
<
true
>::
parse
(
columns_str
,
context
.
getDataTypeFactory
());
...
...
@@ -1603,6 +1629,8 @@ void StorageReplicatedMergeTree::alterThread()
void
StorageReplicatedMergeTree
::
removePartAndEnqueueFetch
(
const
String
&
part_name
)
{
auto
zookeeper
=
getZooKeeper
();
String
part_path
=
replica_path
+
"/parts/"
+
part_name
;
LogEntryPtr
log_entry
=
new
LogEntry
;
...
...
@@ -1648,6 +1676,8 @@ void StorageReplicatedMergeTree::partCheckThread()
{
try
{
auto
zookeeper
=
getZooKeeper
();
/// Достанем из очереди кусок для проверки.
String
part_name
;
{
...
...
@@ -1865,6 +1895,7 @@ void StorageReplicatedMergeTree::becomeLeader()
String
StorageReplicatedMergeTree
::
findReplicaHavingPart
(
const
String
&
part_name
,
bool
active
)
{
auto
zookeeper
=
getZooKeeper
();
Strings
replicas
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/replicas"
);
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
...
...
@@ -1883,6 +1914,8 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
void
StorageReplicatedMergeTree
::
fetchPart
(
const
String
&
part_name
,
const
String
&
replica_path
,
bool
to_detached
)
{
auto
zookeeper
=
getZooKeeper
();
LOG_DEBUG
(
log
,
"Fetching part "
<<
part_name
<<
" from "
<<
replica_path
);
TableStructureReadLockPtr
table_lock
;
...
...
@@ -2066,6 +2099,8 @@ bool StorageReplicatedMergeTree::optimize()
void
StorageReplicatedMergeTree
::
alter
(
const
AlterCommands
&
params
,
const
String
&
database_name
,
const
String
&
table_name
,
Context
&
context
)
{
auto
zookeeper
=
getZooKeeper
();
LOG_DEBUG
(
log
,
"Doing ALTER"
);
NamesAndTypesList
new_columns
;
...
...
@@ -2183,6 +2218,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
void
StorageReplicatedMergeTree
::
dropPartition
(
const
Field
&
field
,
bool
detach
,
const
Settings
&
settings
)
{
auto
zookeeper
=
getZooKeeper
();
String
month_name
=
MergeTreeData
::
getMonthName
(
field
);
/// TODO: Делать запрос в лидера по TCP.
...
...
@@ -2241,6 +2277,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach,
void
StorageReplicatedMergeTree
::
attachPartition
(
const
Field
&
field
,
bool
unreplicated
,
bool
attach_part
,
const
Settings
&
settings
)
{
auto
zookeeper
=
getZooKeeper
();
String
partition
;
if
(
attach_part
)
...
...
@@ -2349,6 +2386,8 @@ void StorageReplicatedMergeTree::drop()
if
(
is_readonly
)
throw
Exception
(
"Can't drop readonly replicated table (need to drop data in ZooKeeper as well)"
,
ErrorCodes
::
TABLE_IS_READ_ONLY
);
auto
zookeeper
=
getZooKeeper
();
shutdown
();
LOG_INFO
(
log
,
"Removing replica "
<<
replica_path
);
...
...
@@ -2385,6 +2424,8 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
AbandonableLockInZooKeeper
StorageReplicatedMergeTree
::
allocateBlockNumber
(
const
String
&
month_name
)
{
auto
zookeeper
=
getZooKeeper
();
String
month_path
=
zookeeper_path
+
"/block_numbers/"
+
month_name
;
if
(
!
zookeeper
->
exists
(
month_path
))
{
...
...
@@ -2410,6 +2451,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
void
StorageReplicatedMergeTree
::
waitForAllReplicasToProcessLogEntry
(
const
LogEntry
&
entry
)
{
auto
zookeeper
=
getZooKeeper
();
LOG_DEBUG
(
log
,
"Waiting for all replicas to process "
<<
entry
.
znode_name
);
Strings
replicas
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/replicas"
);
...
...
@@ -2422,6 +2464,8 @@ void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const LogEn
void
StorageReplicatedMergeTree
::
waitForReplicaToProcessLogEntry
(
const
String
&
replica
,
const
LogEntry
&
entry
)
{
auto
zookeeper
=
getZooKeeper
();
UInt64
log_index
=
parse
<
UInt64
>
(
entry
.
znode_name
.
substr
(
entry
.
znode_name
.
size
()
-
10
));
String
log_entry_str
=
entry
.
toString
();
...
...
@@ -2479,6 +2523,8 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
void
StorageReplicatedMergeTree
::
getStatus
(
Status
&
res
,
bool
with_zk_fields
)
{
auto
zookeeper
=
getZooKeeper
();
res
.
is_leader
=
is_leader_node
;
res
.
is_readonly
=
is_readonly
;
res
.
is_session_expired
=
!
zookeeper
||
zookeeper
->
expired
();
...
...
@@ -2551,6 +2597,8 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
void
StorageReplicatedMergeTree
::
fetchPartition
(
const
Field
&
partition
,
const
String
&
from_
,
const
Settings
&
settings
)
{
auto
zookeeper
=
getZooKeeper
();
String
partition_str
=
MergeTreeData
::
getMonthName
(
partition
);
String
from
=
from_
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录