Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
e1bc499e
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e1bc499e
编写于
2月 13, 2020
作者:
A
alesapin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove comments
上级
a6938cf5
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
14 addition
and
74 deletion
+14
-74
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+14
-74
未找到文件。
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
e1bc499e
...
...
@@ -3229,38 +3229,28 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
auto
metadata_from_entry
=
ReplicatedMergeTreeTableMetadata
::
parse
(
entry
.
metadata_str
);
auto
metadata_diff
=
ReplicatedMergeTreeTableMetadata
(
*
this
).
checkAndFindDiff
(
metadata_from_entry
,
/* allow_alter = */
true
);
//////std::cerr << "Metadata received\n";
MergeTreeData
::
DataParts
parts
;
/// If metadata nodes have changed, we will update table structure locally.
{
LOG_INFO
(
log
,
"Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock."
);
Coordination
::
Requests
requests
;
requests
.
emplace_back
(
zkutil
::
makeSetRequest
(
replica_path
+
"/columns"
,
entry
.
columns_str
,
-
1
));
requests
.
emplace_back
(
zkutil
::
makeSetRequest
(
replica_path
+
"/metadata"
,
entry
.
metadata_str
,
-
1
));
zookeeper
->
multi
(
requests
);
{
auto
table_lock
=
lockExclusively
(
RWLockImpl
::
NO_QUERY
);
Coordination
::
Requests
requests
;
requests
.
emplace_back
(
zkutil
::
makeSetRequest
(
replica_path
+
"/columns"
,
entry
.
columns_str
,
-
1
));
requests
.
emplace_back
(
zkutil
::
makeSetRequest
(
replica_path
+
"/metadata"
,
entry
.
metadata_str
,
-
1
));
LOG_INFO
(
log
,
"Metadata changed in ZooKeeper. Applying changes locally."
);
zookeeper
->
multi
(
requests
);
setTableStructure
(
std
::
move
(
columns_from_entry
),
metadata_diff
);
metadata_version
=
entry
.
alter_version
;
{
auto
table_lock
=
lockExclusively
(
RWLockImpl
::
NO_QUERY
)
;
LOG_INFO
(
log
,
"Applied changes to the metadata of the table. Setting metadata version:"
<<
metadata_version
);
}
LOG_INFO
(
log
,
"Metadata changed in ZooKeeper. Applying changes locally."
);
//////std::cerr << "Recalculating columns sizes\n";
recalculateColumnSizes
();
/// Update metadata ZK nodes for a specific replica.
setTableStructure
(
std
::
move
(
columns_from_entry
),
metadata_diff
);
metadata_version
=
entry
.
alter_version
;
////std::cerr << "Nodes in zk updated\n"
;
LOG_INFO
(
log
,
"Applied changes to the metadata of the table. Setting metadata version:"
<<
metadata_version
)
;
}
recalculateColumnSizes
();
return
true
;
}
...
...
@@ -3270,21 +3260,14 @@ void StorageReplicatedMergeTree::alter(
{
assertNotReadonly
();
LOG_DEBUG
(
log
,
"Doing ALTER FROM "
<<
metadata_version
);
auto
maybe_mutation_commands
=
params
.
getMutationCommands
(
getInMemoryMetadata
());
auto
table_id
=
getStorageID
();
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
/// each replica. So we have to wait AlterThread even with lightweight
/// metadata alter.
if
(
params
.
isSettingsAlter
())
{
lockStructureExclusively
(
table_lock_holder
,
query_context
.
getCurrentQueryId
());
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
LOG_DEBUG
(
log
,
"ALTER storage_settings_ptr only"
);
StorageInMemoryMetadata
metadata
=
getInMemoryMetadata
();
params
.
apply
(
metadata
);
...
...
@@ -3303,8 +3286,6 @@ void StorageReplicatedMergeTree::alter(
auto
zookeeper
=
getZooKeeper
();
////std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl;
ReplicatedMergeTreeLogEntryData
entry
;
std
::
optional
<
String
>
mutation_znode
;
...
...
@@ -3321,10 +3302,6 @@ void StorageReplicatedMergeTree::alter(
StorageInMemoryMetadata
metadata
=
getInMemoryMetadata
();
params
.
apply
(
metadata
);
////std::cerr << "Path for alter:" << path << std::endl;
//int alter_version = columns_stat.version;
//std::cerr << "Alter version:" << alter_version << std::endl;
String
new_columns_str
=
metadata
.
columns
.
toString
();
ops
.
emplace_back
(
zkutil
::
makeSetRequest
(
zookeeper_path
+
"/columns"
,
new_columns_str
,
metadata_version
));
...
...
@@ -3347,8 +3324,6 @@ void StorageReplicatedMergeTree::alter(
String
new_metadata_str
=
new_metadata
.
toString
();
ops
.
emplace_back
(
zkutil
::
makeSetRequest
(
zookeeper_path
+
"/metadata"
,
new_metadata_str
,
metadata_stat
.
version
));
/// Perform settings update locally
{
lockStructureExclusively
(
table_lock_holder
,
query_context
.
getCurrentQueryId
());
auto
old_metadata
=
getInMemoryMetadata
();
...
...
@@ -3372,7 +3347,6 @@ void StorageReplicatedMergeTree::alter(
ops
.
emplace_back
(
zkutil
::
makeCreateRequest
(
zookeeper_path
+
"/log/log-"
,
entry
.
toString
(),
zkutil
::
CreateMode
::
PersistentSequential
));
bool
have_mutation
=
false
;
std
::
optional
<
EphemeralLocksInAllPartitions
>
lock_holder
;
size_t
partitions_count
=
0
;
if
(
!
maybe_mutation_commands
.
empty
())
...
...
@@ -3386,8 +3360,6 @@ void StorageReplicatedMergeTree::alter(
Coordination
::
Stat
mutations_stat
;
zookeeper
->
get
(
mutations_path
,
&
mutations_stat
);
//Coordination::Stat intention_counter_stat;
//zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat);
Strings
partitions
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/block_numbers"
);
std
::
vector
<
std
::
future
<
Coordination
::
GetResponse
>>
partition_futures
;
...
...
@@ -3400,7 +3372,6 @@ void StorageReplicatedMergeTree::alter(
auto
stat
=
partition_futures
[
i
].
get
().
stat
;
auto
partition
=
partitions
[
i
];
partition_versions
[
partition
]
=
stat
.
version
;
LOG_DEBUG
(
log
,
"Partition version:"
<<
partition
<<
" stat version "
<<
stat
.
version
);
}
lock_holder
.
emplace
(
...
...
@@ -3409,13 +3380,8 @@ void StorageReplicatedMergeTree::alter(
for
(
const
auto
&
lock
:
lock_holder
->
getLocks
())
{
mutation_entry
.
block_numbers
[
lock
.
partition_id
]
=
lock
.
number
;
//ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version));
ops
.
emplace_back
(
zkutil
::
makeCheckRequest
(
zookeeper_path
+
"/block_numbers/"
+
lock
.
partition_id
,
partition_versions
[
lock
.
partition_id
]));
partitions_count
++
;
LOG_DEBUG
(
log
,
"ALLOCATED:"
<<
lock
.
number
<<
" FOR VERSION:"
<<
metadata_version
+
1
<<
" Partition version:"
<<
partition_versions
[
lock
.
partition_id
]
+
1
<<
" for partition "
<<
lock
.
partition_id
);
}
mutation_entry
.
create_time
=
time
(
nullptr
);
...
...
@@ -3424,37 +3390,25 @@ void StorageReplicatedMergeTree::alter(
ops
.
emplace_back
(
zkutil
::
makeCreateRequest
(
mutations_path
+
"/"
,
mutation_entry
.
toString
(),
zkutil
::
CreateMode
::
PersistentSequential
));
have_mutation
=
true
;
}
Coordination
::
Responses
results
;
int32_t
rc
=
zookeeper
->
tryMulti
(
ops
,
results
);
LOG_DEBUG
(
log
,
"ALTER REQUESTED TO:"
<<
entry
.
alter_version
);
//std::cerr << "Results size:" << results.size() << std::endl;
//std::cerr << "Have mutation:" << have_mutation << std::endl;
if
(
rc
==
Coordination
::
ZOK
)
{
queue
.
pullLogsToQueue
(
zookeeper
);
if
(
have_mutation
)
if
(
entry
.
have_mutation
)
{
//std::cerr << "In have mutation\n";
//std::cerr << "INDEX:" << results.size() - 2 << std::endl;
String
alter_path
=
dynamic_cast
<
const
Coordination
::
CreateResponse
&>
(
*
results
[
results
.
size
()
-
3
-
partitions_count
]).
path_created
;
//std::cerr << "Alter path:" << alter_path << std::endl;
entry
.
znode_name
=
alter_path
.
substr
(
alter_path
.
find_last_of
(
'/'
)
+
1
);
String
mutation_path
=
dynamic_cast
<
const
Coordination
::
CreateResponse
&>
(
*
results
.
back
()).
path_created
;
//std::cerr << "Mutations path:" << mutation_path << std::endl;
mutation_znode
=
mutation_path
.
substr
(
mutation_path
.
find_last_of
(
'/'
)
+
1
);
}
else
{
//std::cerr << "Results size:" << results.size() << std::endl;
String
alter_path
=
dynamic_cast
<
const
Coordination
::
CreateResponse
&>
(
*
results
.
back
()).
path_created
;
//std::cerr << "Alters path:" << alter_path << std::endl;
entry
.
znode_name
=
alter_path
.
substr
(
alter_path
.
find_last_of
(
'/'
)
+
1
);
}
}
...
...
@@ -3462,37 +3416,23 @@ void StorageReplicatedMergeTree::alter(
{
throw
Coordination
::
Exception
(
"Cannot alter"
,
rc
);
}
}
LOG_DEBUG
(
log
,
"Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."
);
table_lock_holder
.
release
();
std
::
vector
<
String
>
unwaited
;
//std::cerr << "Started wait for alter\n";
if
(
query_context
.
getSettingsRef
().
replication_alter_partitions_sync
==
2
)
{
LOG_DEBUG
(
log
,
"Start waiting for metadata alter"
);
unwaited
=
waitForAllReplicasToProcessLogEntry
(
entry
,
false
);
LOG_DEBUG
(
log
,
"Finished waiting for metadata alter"
);
}
else
if
(
query_context
.
getSettingsRef
().
replication_alter_partitions_sync
==
1
)
waitForReplicaToProcessLogEntry
(
replica_name
,
entry
);
//std::cerr << "FInished wait for alter\n";
if
(
!
unwaited
.
empty
())
{
throw
Exception
(
"Some replicas doesn't finish metadata alter"
,
ErrorCodes
::
UNFINISHED
);
}
if
(
mutation_znode
)
{
//std::cerr << "Started wait for mutation:" << *mutation_znode << std::endl;
LOG_DEBUG
(
log
,
"Start waiting for mutation"
);
waitMutation
(
*
mutation_znode
,
query_context
.
getSettingsRef
().
replication_alter_partitions_sync
);
LOG_DEBUG
(
log
,
"Finished waiting for mutation"
);
//std::cerr << "FInished wait for mutation\n";
}
}
void
StorageReplicatedMergeTree
::
alterPartition
(
const
ASTPtr
&
query
,
const
PartitionCommands
&
commands
,
const
Context
&
query_context
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录