Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
2e34c5c7
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
2e34c5c7
编写于
5月 04, 2019
作者:
A
alexey-milovidov
提交者:
GitHub
5月 04, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5179 from yandex/merge-tree-data-remove-bad-code
Removed huge chunk of bad code
上级
8dd7d4d4
b2443f6a
变更
24
展开全部
隐藏空白更改
内联
并排
Showing
24 changed file
with
441 addition
and
588 deletion
+441
-588
dbms/src/Interpreters/AsynchronousMetrics.cpp
dbms/src/Interpreters/AsynchronousMetrics.cpp
+2
-2
dbms/src/Interpreters/InterpreterSelectQuery.cpp
dbms/src/Interpreters/InterpreterSelectQuery.cpp
+6
-7
dbms/src/Interpreters/MutationsInterpreter.cpp
dbms/src/Interpreters/MutationsInterpreter.cpp
+3
-8
dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
+2
-2
dbms/src/Storages/MergeTree/MergeTreeData.cpp
dbms/src/Storages/MergeTree/MergeTreeData.cpp
+8
-14
dbms/src/Storages/MergeTree/MergeTreeData.h
dbms/src/Storages/MergeTree/MergeTreeData.h
+28
-28
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
+2
-2
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
...src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
+6
-6
dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
...orages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
+5
-5
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
...c/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
+14
-14
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
...Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
+8
-8
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
+7
-7
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
...torages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
+9
-9
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
+2
-2
dbms/src/Storages/StorageMergeTree.cpp
dbms/src/Storages/StorageMergeTree.cpp
+90
-90
dbms/src/Storages/StorageMergeTree.h
dbms/src/Storages/StorageMergeTree.h
+7
-45
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+210
-216
dbms/src/Storages/StorageReplicatedMergeTree.h
dbms/src/Storages/StorageReplicatedMergeTree.h
+13
-57
dbms/src/Storages/System/StorageSystemColumns.cpp
dbms/src/Storages/System/StorageSystemColumns.cpp
+8
-16
dbms/src/Storages/System/StorageSystemGraphite.cpp
dbms/src/Storages/System/StorageSystemGraphite.cpp
+3
-15
dbms/src/Storages/System/StorageSystemMutations.cpp
dbms/src/Storages/System/StorageSystemMutations.cpp
+4
-15
dbms/src/Storages/System/StorageSystemParts.cpp
dbms/src/Storages/System/StorageSystemParts.cpp
+0
-2
dbms/src/Storages/System/StorageSystemPartsBase.cpp
dbms/src/Storages/System/StorageSystemPartsBase.cpp
+4
-17
dbms/src/Storages/System/StorageSystemPartsColumns.cpp
dbms/src/Storages/System/StorageSystemPartsColumns.cpp
+0
-1
未找到文件。
dbms/src/Interpreters/AsynchronousMetrics.cpp
浏览文件 @
2e34c5c7
...
...
@@ -191,12 +191,12 @@ void AsynchronousMetrics::update()
"Cannot get replica delay for table: "
+
backQuoteIfNeed
(
db
.
first
)
+
"."
+
backQuoteIfNeed
(
iterator
->
name
()));
}
calculateMax
(
max_part_count_for_partition
,
table_replicated_merge_tree
->
get
Data
().
get
MaxPartsCountForPartition
());
calculateMax
(
max_part_count_for_partition
,
table_replicated_merge_tree
->
getMaxPartsCountForPartition
());
}
if
(
table_merge_tree
)
{
calculateMax
(
max_part_count_for_partition
,
table_merge_tree
->
get
Data
().
get
MaxPartsCountForPartition
());
calculateMax
(
max_part_count_for_partition
,
table_merge_tree
->
getMaxPartsCountForPartition
());
}
}
}
...
...
dbms/src/Interpreters/InterpreterSelectQuery.cpp
浏览文件 @
2e34c5c7
...
...
@@ -30,6 +30,8 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
...
...
@@ -43,8 +45,7 @@
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
...
...
@@ -590,13 +591,11 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if
(
settings
.
optimize_move_to_prewhere
&&
query
.
where
()
&&
!
query
.
prewhere
()
&&
!
query
.
final
())
MergeTreeWhereOptimizer
{
query_info
,
context
,
merge_tree
.
getData
()
,
query_analyzer
->
getRequiredSourceColumns
(),
log
};
MergeTreeWhereOptimizer
{
query_info
,
context
,
merge_tree
,
query_analyzer
->
getRequiredSourceColumns
(),
log
};
};
if
(
const
StorageMergeTree
*
merge_tree
=
dynamic_cast
<
const
StorageMergeTree
*>
(
storage
.
get
()))
optimize_prewhere
(
*
merge_tree
);
else
if
(
const
StorageReplicatedMergeTree
*
replicated_merge_tree
=
dynamic_cast
<
const
StorageReplicatedMergeTree
*>
(
storage
.
get
()))
optimize_prewhere
(
*
replicated_merge_tree
);
if
(
const
MergeTreeData
*
merge_tree_data
=
dynamic_cast
<
const
MergeTreeData
*>
(
storage
.
get
()))
optimize_prewhere
(
*
merge_tree_data
);
}
AnalysisResult
expressions
;
...
...
dbms/src/Interpreters/MutationsInterpreter.cpp
浏览文件 @
2e34c5c7
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
...
...
@@ -86,12 +85,8 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
static
NameSet
getKeyColumns
(
const
StoragePtr
&
storage
)
{
const
MergeTreeData
*
merge_tree_data
=
nullptr
;
if
(
auto
merge_tree
=
dynamic_cast
<
StorageMergeTree
*>
(
storage
.
get
()))
merge_tree_data
=
&
merge_tree
->
getData
();
else
if
(
auto
replicated_merge_tree
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
storage
.
get
()))
merge_tree_data
=
&
replicated_merge_tree
->
getData
();
else
const
MergeTreeData
*
merge_tree_data
=
dynamic_cast
<
const
MergeTreeData
*>
(
storage
.
get
());
if
(
!
merge_tree_data
)
return
{};
NameSet
key_columns
;
...
...
dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
浏览文件 @
2e34c5c7
...
...
@@ -14,7 +14,7 @@ Block MergeTreeBlockOutputStream::getHeader() const
void
MergeTreeBlockOutputStream
::
write
(
const
Block
&
block
)
{
storage
.
d
ata
.
d
elayInsertOrThrowIfNeeded
();
storage
.
delayInsertOrThrowIfNeeded
();
auto
part_blocks
=
storage
.
writer
.
splitBlockIntoParts
(
block
,
max_parts_per_block
);
for
(
auto
&
current_block
:
part_blocks
)
...
...
@@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
Stopwatch
watch
;
MergeTreeData
::
MutableDataPartPtr
part
=
storage
.
writer
.
writeTempPart
(
current_block
);
storage
.
data
.
renameTempPartAndAdd
(
part
,
&
storage
.
increment
);
storage
.
renameTempPartAndAdd
(
part
,
&
storage
.
increment
);
PartLog
::
addNewPart
(
storage
.
global_context
,
part
,
watch
.
elapsed
());
...
...
dbms/src/Storages/MergeTree/MergeTreeData.cpp
浏览文件 @
2e34c5c7
...
...
@@ -116,7 +116,7 @@ MergeTreeData::MergeTreeData(
database_name
(
database_
),
table_name
(
table_
),
full_path
(
full_path_
),
broken_part_callback
(
broken_part_callback_
),
log_name
(
database_name
+
"."
+
table_name
),
log
(
&
Logger
::
get
(
log_name
+
" (Data)"
)),
log_name
(
database_name
+
"."
+
table_name
),
log
(
&
Logger
::
get
(
log_name
)),
data_parts_by_info
(
data_parts_indexes
.
get
<
TagByInfo
>
()),
data_parts_by_state_and_info
(
data_parts_indexes
.
get
<
TagByStateAndInfo
>
())
{
...
...
@@ -730,7 +730,7 @@ String MergeTreeData::MergingParams::getModeName() const
}
Int64
MergeTreeData
::
getMaxBlockNumber
()
Int64
MergeTreeData
::
getMaxBlockNumber
()
const
{
auto
lock
=
lockParts
();
...
...
@@ -2665,7 +2665,7 @@ bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const A
return
false
;
}
bool
MergeTreeData
::
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
)
const
bool
MergeTreeData
::
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
)
const
{
/// Make sure that the left side of the IN operator contain part of the key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple
...
...
@@ -2694,18 +2694,12 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con
}
}
MergeTreeData
*
MergeTreeData
::
checkStructureAndGetMergeTreeData
(
const
StoragePtr
&
source_table
)
const
MergeTreeData
&
MergeTreeData
::
checkStructureAndGetMergeTreeData
(
const
StoragePtr
&
source_table
)
const
{
MergeTreeData
*
src_data
;
if
(
auto
storage_merge_tree
=
dynamic_cast
<
StorageMergeTree
*>
(
source_table
.
get
()))
src_data
=
&
storage_merge_tree
->
data
;
else
if
(
auto
storage_replicated_merge_tree
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
source_table
.
get
()))
src_data
=
&
storage_replicated_merge_tree
->
data
;
else
{
throw
Exception
(
"Table "
+
table_name
+
" supports attachPartitionFrom only for MergeTree or ReplicatedMergeTree engines."
MergeTreeData
*
src_data
=
dynamic_cast
<
MergeTreeData
*>
(
source_table
.
get
());
if
(
!
src_data
)
throw
Exception
(
"Table "
+
table_name
+
" supports attachPartitionFrom only for MergeTree family of table engines."
" Got "
+
source_table
->
getName
(),
ErrorCodes
::
NOT_IMPLEMENTED
);
}
if
(
getColumns
().
getAllPhysical
().
sizeOfDifference
(
src_data
->
getColumns
().
getAllPhysical
()))
throw
Exception
(
"Tables have different structure"
,
ErrorCodes
::
INCOMPATIBLE_COLUMNS
);
...
...
@@ -2724,7 +2718,7 @@ MergeTreeData * MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePt
if
(
format_version
!=
src_data
->
format_version
)
throw
Exception
(
"Tables have different format_version"
,
ErrorCodes
::
BAD_ARGUMENTS
);
return
src_data
;
return
*
src_data
;
}
MergeTreeData
::
MutableDataPartPtr
MergeTreeData
::
cloneAndLoadDataPart
(
const
MergeTreeData
::
DataPartPtr
&
src_part
,
...
...
dbms/src/Storages/MergeTree/MergeTreeData.h
浏览文件 @
2e34c5c7
...
...
@@ -3,10 +3,11 @@
#include <Common/SimpleIncrement.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/I
TableDeclaration
.h>
#include <Storages/I
Storage
.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
...
...
@@ -89,7 +90,7 @@ namespace ErrorCodes
/// - MergeTreeDataWriter
/// - MergeTreeDataMergerMutator
class
MergeTreeData
:
public
I
TableDeclaration
class
MergeTreeData
:
public
I
Storage
{
public:
/// Function to call if the part is suspected to contain corrupt data.
...
...
@@ -344,12 +345,21 @@ public:
bool
attach
,
BrokenPartCallback
broken_part_callback_
=
[](
const
String
&
){});
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void
loadDataParts
(
bool
skip_sanity_checks
);
ASTPtr
getPartitionKeyAST
()
const
override
{
return
partition_by_ast
;
}
ASTPtr
getSortingKeyAST
()
const
override
{
return
sorting_key_expr_ast
;
}
ASTPtr
getPrimaryKeyAST
()
const
override
{
return
primary_key_expr_ast
;
}
ASTPtr
getSamplingKeyAST
()
const
override
{
return
sample_by_ast
;
}
bool
supportsPrewhere
()
const
{
return
true
;
}
Names
getColumnsRequiredForPartitionKey
()
const
override
{
return
(
partition_key_expr
?
partition_key_expr
->
getRequiredColumns
()
:
Names
{});
}
Names
getColumnsRequiredForSortingKey
()
const
override
{
return
sorting_key_expr
->
getRequiredColumns
();
}
Names
getColumnsRequiredForPrimaryKey
()
const
override
{
return
primary_key_expr
->
getRequiredColumns
();
}
Names
getColumnsRequiredForSampling
()
const
override
{
return
columns_required_for_sampling
;
}
Names
getColumnsRequiredForFinal
()
const
override
{
return
sorting_key_expr
->
getRequiredColumns
();
}
bool
supportsFinal
()
const
bool
supportsPrewhere
()
const
override
{
return
true
;
}
bool
supportsSampling
()
const
override
{
return
sample_by_ast
!=
nullptr
;
}
bool
supportsFinal
()
const
override
{
return
merging_params
.
mode
==
MergingParams
::
Collapsing
||
merging_params
.
mode
==
MergingParams
::
Summing
...
...
@@ -358,9 +368,7 @@ public:
||
merging_params
.
mode
==
MergingParams
::
VersionedCollapsing
;
}
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
)
const
;
Int64
getMaxBlockNumber
();
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
)
const
override
;
NameAndTypePair
getColumn
(
const
String
&
column_name
)
const
override
{
...
...
@@ -385,14 +393,17 @@ public:
||
column_name
==
"_sample_factor"
;
}
String
getDatabaseName
()
const
{
return
database_name
;
}
String
getDatabaseName
()
const
override
{
return
database_name
;
}
String
getTableName
()
const
override
{
return
table_name
;
}
String
getTableName
()
const
{
return
table_name
;
}
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void
loadDataParts
(
bool
skip_sanity_checks
);
String
getFullPath
()
const
{
return
full_path
;
}
String
getLogName
()
const
{
return
log_name
;
}
Int64
getMaxBlockNumber
()
const
;
/// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts
getDataParts
(
const
DataPartStates
&
affordable_states
)
const
;
/// Returns sorted list of the parts with specified states
...
...
@@ -539,23 +550,11 @@ public:
*/
static
ASTPtr
extractKeyExpressionList
(
const
ASTPtr
&
node
);
Names
getColumnsRequiredForPartitionKey
()
const
{
return
(
partition_key_expr
?
partition_key_expr
->
getRequiredColumns
()
:
Names
{});
}
bool
hasSortingKey
()
const
{
return
!
sorting_key_columns
.
empty
();
}
bool
hasPrimaryKey
()
const
{
return
!
primary_key_columns
.
empty
();
}
bool
hasSkipIndices
()
const
{
return
!
skip_indices
.
empty
();
}
bool
hasTableTTL
()
const
{
return
ttl_table_ast
!=
nullptr
;
}
ASTPtr
getSortingKeyAST
()
const
{
return
sorting_key_expr_ast
;
}
ASTPtr
getPrimaryKeyAST
()
const
{
return
primary_key_expr_ast
;
}
Names
getColumnsRequiredForSortingKey
()
const
{
return
sorting_key_expr
->
getRequiredColumns
();
}
Names
getColumnsRequiredForPrimaryKey
()
const
{
return
primary_key_expr
->
getRequiredColumns
();
}
bool
supportsSampling
()
const
{
return
sample_by_ast
!=
nullptr
;
}
ASTPtr
getSamplingExpression
()
const
{
return
sample_by_ast
;
}
Names
getColumnsRequiredForSampling
()
const
{
return
columns_required_for_sampling
;
}
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr
loadPartAndFixMetadata
(
const
String
&
relative_path
);
...
...
@@ -592,11 +591,13 @@ public:
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
/// Tables structure should be locked.
MergeTreeData
*
checkStructureAndGetMergeTreeData
(
const
StoragePtr
&
source_table
)
const
;
MergeTreeData
&
checkStructureAndGetMergeTreeData
(
const
StoragePtr
&
source_table
)
const
;
MergeTreeData
::
MutableDataPartPtr
cloneAndLoadDataPart
(
const
MergeTreeData
::
DataPartPtr
&
src_part
,
const
String
&
tmp_part_prefix
,
const
MergeTreePartInfo
&
dst_part_info
);
virtual
std
::
vector
<
MergeTreeMutationStatus
>
getMutationsStatus
()
const
=
0
;
MergeTreeDataFormatVersion
format_version
;
Context
global_context
;
...
...
@@ -655,13 +656,12 @@ public:
/// For generating names of temporary parts during insertion.
SimpleIncrement
insert_increment
;
pr
ivate
:
pr
otected
:
friend
struct
MergeTreeDataPart
;
friend
class
StorageMergeTree
;
friend
class
StorageReplicatedMergeTree
;
friend
class
MergeTreeDataMergerMutator
;
friend
class
ReplicatedMergeTreeAlterThread
;
friend
struct
ReplicatedMergeTreeTableMetadata
;
friend
class
StorageReplicatedMergeTree
;
ASTPtr
partition_by_ast
;
ASTPtr
order_by_ast
;
...
...
dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
浏览文件 @
2e34c5c7
...
...
@@ -449,7 +449,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
throw
Exception
(
"Sampling column not in primary key"
,
ErrorCodes
::
ILLEGAL_COLUMN
);
ASTPtr
args
=
std
::
make_shared
<
ASTExpressionList
>
();
args
->
children
.
push_back
(
data
.
getSampling
Expression
());
args
->
children
.
push_back
(
data
.
getSampling
KeyAST
());
args
->
children
.
push_back
(
std
::
make_shared
<
ASTLiteral
>
(
lower
));
lower_function
=
std
::
make_shared
<
ASTFunction
>
();
...
...
@@ -466,7 +466,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
throw
Exception
(
"Sampling column not in primary key"
,
ErrorCodes
::
ILLEGAL_COLUMN
);
ASTPtr
args
=
std
::
make_shared
<
ASTExpressionList
>
();
args
->
children
.
push_back
(
data
.
getSampling
Expression
());
args
->
children
.
push_back
(
data
.
getSampling
KeyAST
());
args
->
children
.
push_back
(
std
::
make_shared
<
ASTLiteral
>
(
upper
));
upper_function
=
std
::
make_shared
<
ASTFunction
>
();
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
浏览文件 @
2e34c5c7
...
...
@@ -36,7 +36,7 @@ void ReplicatedMergeTreeAlterThread::run()
try
{
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
* as well as a description of columns in local file with metadata (storage.
data.
getColumnsList()).
* as well as a description of columns in local file with metadata (storage.getColumnsList()).
*
* If these descriptions are different - you need to do ALTER.
*
...
...
@@ -83,7 +83,7 @@ void ReplicatedMergeTreeAlterThread::run()
const
String
&
metadata_str
=
metadata_znode
.
contents
;
auto
metadata_in_zk
=
ReplicatedMergeTreeTableMetadata
::
parse
(
metadata_str
);
auto
metadata_diff
=
ReplicatedMergeTreeTableMetadata
(
storage
.
data
).
checkAndFindDiff
(
metadata_in_zk
,
/* allow_alter = */
true
);
auto
metadata_diff
=
ReplicatedMergeTreeTableMetadata
(
storage
).
checkAndFindDiff
(
metadata_in_zk
,
/* allow_alter = */
true
);
/// If you need to lock table structure, then suspend merges.
ActionLock
merge_blocker
=
storage
.
merger_mutator
.
actions_blocker
.
cancel
();
...
...
@@ -123,7 +123,7 @@ void ReplicatedMergeTreeAlterThread::run()
}
/// You need to get a list of parts under table lock to avoid race condition with merge.
parts
=
storage
.
data
.
getDataParts
();
parts
=
storage
.
getDataParts
();
storage
.
columns_version
=
columns_version
;
storage
.
metadata_version
=
metadata_version
;
...
...
@@ -140,7 +140,7 @@ void ReplicatedMergeTreeAlterThread::run()
int
changed_parts
=
0
;
if
(
!
changed_columns_version
)
parts
=
storage
.
data
.
getDataParts
();
parts
=
storage
.
getDataParts
();
const
auto
columns_for_parts
=
storage
.
getColumns
().
getAllPhysical
();
const
auto
indices_for_parts
=
storage
.
getIndices
();
...
...
@@ -150,7 +150,7 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update the part and write result to temporary files.
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
/// node /flags/force_alter.
auto
transaction
=
storage
.
data
.
alterDataPart
(
part
,
columns_for_parts
,
indices_for_parts
.
indices
,
false
);
auto
transaction
=
storage
.
alterDataPart
(
part
,
columns_for_parts
,
indices_for_parts
.
indices
,
false
);
if
(
!
transaction
)
continue
;
...
...
@@ -160,7 +160,7 @@ void ReplicatedMergeTreeAlterThread::run()
}
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
storage
.
data
.
recalculateColumnSizes
();
storage
.
recalculateColumnSizes
();
if
(
changed_columns_version
)
{
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
浏览文件 @
2e34c5c7
...
...
@@ -35,7 +35,7 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream
::
ReplicatedMergeTreeBlockOutputStream
(
StorageReplicatedMergeTree
&
storage_
,
size_t
quorum_
,
size_t
quorum_timeout_ms_
,
size_t
max_parts_per_block
,
bool
deduplicate_
)
:
storage
(
storage_
),
quorum
(
quorum_
),
quorum_timeout_ms
(
quorum_timeout_ms_
),
max_parts_per_block
(
max_parts_per_block
),
deduplicate
(
deduplicate_
),
log
(
&
Logger
::
get
(
storage
.
data
.
getLogName
()
+
" (Replicated OutputStream)"
))
log
(
&
Logger
::
get
(
storage
.
getLogName
()
+
" (Replicated OutputStream)"
))
{
/// The quorum value `1` has the same meaning as if it is disabled.
if
(
quorum
==
1
)
...
...
@@ -109,7 +109,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
last_block_is_duplicate
=
false
;
/// TODO Is it possible to not lock the table structure here?
storage
.
d
ata
.
d
elayInsertOrThrowIfNeeded
(
&
storage
.
partial_shutdown_event
);
storage
.
delayInsertOrThrowIfNeeded
(
&
storage
.
partial_shutdown_event
);
auto
zookeeper
=
storage
.
getZooKeeper
();
assertSessionIsNotExpired
(
zookeeper
);
...
...
@@ -297,8 +297,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
quorum_info
.
host_node_version
));
}
MergeTreeData
::
Transaction
transaction
(
storage
.
data
);
/// If you can not add a part to ZK, we'll remove it back from the working set.
storage
.
data
.
renameTempPartAndAdd
(
part
,
nullptr
,
&
transaction
);
MergeTreeData
::
Transaction
transaction
(
storage
);
/// If you can not add a part to ZK, we'll remove it back from the working set.
storage
.
renameTempPartAndAdd
(
part
,
nullptr
,
&
transaction
);
Coordination
::
Responses
responses
;
int32_t
multi_code
=
zookeeper
->
tryMultiNoThrow
(
ops
,
responses
);
/// 1 RTT
...
...
@@ -414,7 +414,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
void
ReplicatedMergeTreeBlockOutputStream
::
writePrefix
()
{
storage
.
data
.
throwInsertIfNeeded
();
storage
.
throwInsertIfNeeded
();
}
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
浏览文件 @
2e34c5c7
...
...
@@ -27,8 +27,8 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
void
ReplicatedMergeTreeCleanupThread
::
run
()
{
const
auto
CLEANUP_SLEEP_MS
=
storage
.
data
.
settings
.
cleanup_delay_period
*
1000
+
std
::
uniform_int_distribution
<
UInt64
>
(
0
,
storage
.
data
.
settings
.
cleanup_delay_period_random_add
*
1000
)(
rng
);
const
auto
CLEANUP_SLEEP_MS
=
storage
.
settings
.
cleanup_delay_period
*
1000
+
std
::
uniform_int_distribution
<
UInt64
>
(
0
,
storage
.
settings
.
cleanup_delay_period_random_add
*
1000
)(
rng
);
try
{
...
...
@@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
/// TODO: Implement tryLockStructureForShare.
auto
lock
=
storage
.
lockStructureForShare
(
false
,
""
);
storage
.
data
.
clearOldTemporaryDirectories
();
storage
.
clearOldTemporaryDirectories
();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment
...
...
@@ -82,7 +82,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
int
children_count
=
stat
.
numChildren
;
/// We will wait for 1.1 times more records to accumulate than necessary.
if
(
static_cast
<
double
>
(
children_count
)
<
storage
.
data
.
settings
.
min_replicated_logs_to_keep
*
1.1
)
if
(
static_cast
<
double
>
(
children_count
)
<
storage
.
settings
.
min_replicated_logs_to_keep
*
1.1
)
return
;
Strings
replicas
=
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/replicas"
,
&
stat
);
...
...
@@ -100,8 +100,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std
::
sort
(
entries
.
begin
(),
entries
.
end
());
String
min_saved_record_log_str
=
entries
[
entries
.
size
()
>
storage
.
data
.
settings
.
max_replicated_logs_to_keep
.
value
?
entries
.
size
()
-
storage
.
data
.
settings
.
max_replicated_logs_to_keep
.
value
entries
.
size
()
>
storage
.
settings
.
max_replicated_logs_to_keep
.
value
?
entries
.
size
()
-
storage
.
settings
.
max_replicated_logs_to_keep
.
value
:
0
];
/// Replicas that were marked is_lost but are active.
...
...
@@ -203,7 +203,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_saved_log_pointer
=
std
::
min
(
min_saved_log_pointer
,
min_log_pointer_lost_candidate
);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries
.
erase
(
entries
.
end
()
-
std
::
min
<
UInt64
>
(
entries
.
size
(),
storage
.
data
.
settings
.
min_replicated_logs_to_keep
.
value
),
entries
.
end
());
entries
.
erase
(
entries
.
end
()
-
std
::
min
<
UInt64
>
(
entries
.
size
(),
storage
.
settings
.
min_replicated_logs_to_keep
.
value
),
entries
.
end
());
/// We will not touch records that are no less than `min_saved_log_pointer`.
entries
.
erase
(
std
::
lower_bound
(
entries
.
begin
(),
entries
.
end
(),
"log-"
+
padIndex
(
min_saved_log_pointer
)),
entries
.
end
());
...
...
@@ -294,12 +294,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64
current_time
=
timed_blocks
.
front
().
ctime
;
Int64
time_threshold
=
std
::
max
(
static_cast
<
Int64
>
(
0
),
current_time
-
static_cast
<
Int64
>
(
1000
*
storage
.
data
.
settings
.
replicated_deduplication_window_seconds
));
Int64
time_threshold
=
std
::
max
(
static_cast
<
Int64
>
(
0
),
current_time
-
static_cast
<
Int64
>
(
1000
*
storage
.
settings
.
replicated_deduplication_window_seconds
));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat
block_threshold
{{},
time_threshold
};
size_t
current_deduplication_window
=
std
::
min
<
size_t
>
(
timed_blocks
.
size
(),
storage
.
data
.
settings
.
replicated_deduplication_window
.
value
);
size_t
current_deduplication_window
=
std
::
min
<
size_t
>
(
timed_blocks
.
size
(),
storage
.
settings
.
replicated_deduplication_window
.
value
);
auto
first_outdated_block_fixed_threshold
=
timed_blocks
.
begin
()
+
current_deduplication_window
;
auto
first_outdated_block_time_threshold
=
std
::
upper_bound
(
timed_blocks
.
begin
(),
timed_blocks
.
end
(),
block_threshold
,
NodeWithStat
::
greaterByTime
);
auto
first_outdated_block
=
std
::
min
(
first_outdated_block_fixed_threshold
,
first_outdated_block_time_threshold
);
...
...
@@ -392,10 +392,10 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
void
ReplicatedMergeTreeCleanupThread
::
clearOldMutations
()
{
if
(
!
storage
.
data
.
settings
.
finished_mutations_to_keep
)
if
(
!
storage
.
settings
.
finished_mutations_to_keep
)
return
;
if
(
storage
.
queue
.
countFinishedMutations
()
<=
storage
.
data
.
settings
.
finished_mutations_to_keep
)
if
(
storage
.
queue
.
countFinishedMutations
()
<=
storage
.
settings
.
finished_mutations_to_keep
)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
...
...
@@ -422,10 +422,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
entries
.
erase
(
std
::
upper_bound
(
entries
.
begin
(),
entries
.
end
(),
padIndex
(
min_pointer
)),
entries
.
end
());
/// Do not remove last `storage.
data.
settings.finished_mutations_to_keep` entries.
if
(
entries
.
size
()
<=
storage
.
data
.
settings
.
finished_mutations_to_keep
)
/// Do not remove last `storage.settings.finished_mutations_to_keep` entries.
if
(
entries
.
size
()
<=
storage
.
settings
.
finished_mutations_to_keep
)
return
;
entries
.
erase
(
entries
.
end
()
-
storage
.
data
.
settings
.
finished_mutations_to_keep
,
entries
.
end
());
entries
.
erase
(
entries
.
end
()
-
storage
.
settings
.
finished_mutations_to_keep
,
entries
.
end
());
if
(
entries
.
empty
())
return
;
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
浏览文件 @
2e34c5c7
...
...
@@ -90,7 +90,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
}
/// If the part is not in ZooKeeper, we'll check if it's at least somewhere.
auto
part_info
=
MergeTreePartInfo
::
fromPartName
(
part_name
,
storage
.
data
.
format_version
);
auto
part_info
=
MergeTreePartInfo
::
fromPartName
(
part_name
,
storage
.
format_version
);
/** The logic is as follows:
* - if some live or inactive replica has such a part, or a part covering it
...
...
@@ -126,7 +126,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
Strings
parts
=
zookeeper
->
getChildren
(
storage
.
zookeeper_path
+
"/replicas/"
+
replica
+
"/parts"
);
for
(
const
String
&
part_on_replica
:
parts
)
{
auto
part_on_replica_info
=
MergeTreePartInfo
::
fromPartName
(
part_on_replica
,
storage
.
data
.
format_version
);
auto
part_on_replica_info
=
MergeTreePartInfo
::
fromPartName
(
part_on_replica
,
storage
.
format_version
);
if
(
part_on_replica_info
.
contains
(
part_info
))
{
...
...
@@ -189,9 +189,9 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
/// If the part is still in the PreCommitted -> Committed transition, it is not lost
/// and there is no need to go searching for it on other replicas. To definitely find the needed part
/// if it exists (or a part containing it) we first search among the PreCommitted parts.
auto
part
=
storage
.
data
.
getPartIfExists
(
part_name
,
{
MergeTreeDataPartState
::
PreCommitted
});
auto
part
=
storage
.
getPartIfExists
(
part_name
,
{
MergeTreeDataPartState
::
PreCommitted
});
if
(
!
part
)
part
=
storage
.
data
.
getActiveContainingPart
(
part_name
);
part
=
storage
.
getActiveContainingPart
(
part_name
);
/// We do not have this or a covering part.
if
(
!
part
)
...
...
@@ -235,8 +235,8 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
checkDataPart
(
part
,
true
,
storage
.
data
.
primary_key_data_types
,
storage
.
data
.
skip_indices
,
storage
.
primary_key_data_types
,
storage
.
skip_indices
,
[
this
]
{
return
need_stop
.
load
();
});
if
(
need_stop
)
...
...
@@ -259,7 +259,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
storage
.
removePartAndEnqueueFetch
(
part_name
);
/// Delete part locally.
storage
.
data
.
forgetPartAndMoveToDetached
(
part
,
"broken_"
);
storage
.
forgetPartAndMoveToDetached
(
part
,
"broken_"
);
}
}
else
if
(
part
->
modification_time
+
MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER
<
time
(
nullptr
))
...
...
@@ -270,7 +270,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
ProfileEvents
::
increment
(
ProfileEvents
::
ReplicatedPartChecksFailed
);
LOG_ERROR
(
log
,
"Unexpected part "
<<
part_name
<<
" in filesystem. Removing."
);
storage
.
data
.
forgetPartAndMoveToDetached
(
part
,
"unexpected_"
);
storage
.
forgetPartAndMoveToDetached
(
part
,
"unexpected_"
);
}
else
{
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
浏览文件 @
2e34c5c7
...
...
@@ -20,7 +20,7 @@ namespace ErrorCodes
ReplicatedMergeTreeQueue
::
ReplicatedMergeTreeQueue
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
,
format_version
(
storage
.
data
.
format_version
)
,
format_version
(
storage
.
format_version
)
,
current_parts
(
format_version
)
,
virtual_parts
(
format_version
)
{}
...
...
@@ -62,14 +62,14 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
Strings
children
=
zookeeper
->
getChildren
(
queue_path
);
auto
to_remove_it
=
std
::
remove_if
(
children
.
begin
(),
children
.
end
(),
[
&
](
const
String
&
path
)
{
return
already_loaded_paths
.
count
(
path
);
});
children
.
begin
(),
children
.
end
(),
[
&
](
const
String
&
path
)
{
return
already_loaded_paths
.
count
(
path
);
});
LOG_DEBUG
(
log
,
"Having "
<<
(
to_remove_it
-
children
.
begin
())
<<
" queue entries to load, "
<<
(
children
.
end
()
-
to_remove_it
)
<<
" entries already loaded."
);
"Having "
<<
(
to_remove_it
-
children
.
begin
())
<<
" queue entries to load, "
<<
(
children
.
end
()
-
to_remove_it
)
<<
" entries already loaded."
);
children
.
erase
(
to_remove_it
,
children
.
end
());
std
::
sort
(
children
.
begin
(),
children
.
end
());
...
...
dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
浏览文件 @
2e34c5c7
...
...
@@ -44,11 +44,11 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
,
log
(
&
Logger
::
get
(
log_name
))
,
active_node_identifier
(
generateActiveNodeIdentifier
())
{
check_period_ms
=
storage
.
data
.
settings
.
zookeeper_session_expiration_check_period
.
totalSeconds
()
*
1000
;
check_period_ms
=
storage
.
settings
.
zookeeper_session_expiration_check_period
.
totalSeconds
()
*
1000
;
/// Periodicity of checking lag of replica.
if
(
check_period_ms
>
static_cast
<
Int64
>
(
storage
.
data
.
settings
.
check_delay_period
)
*
1000
)
check_period_ms
=
storage
.
data
.
settings
.
check_delay_period
*
1000
;
if
(
check_period_ms
>
static_cast
<
Int64
>
(
storage
.
settings
.
check_delay_period
)
*
1000
)
check_period_ms
=
storage
.
settings
.
check_delay_period
*
1000
;
task
=
storage
.
global_context
.
getSchedulePool
().
createTask
(
log_name
,
[
this
]{
run
();
});
}
...
...
@@ -121,7 +121,7 @@ void ReplicatedMergeTreeRestartingThread::run()
}
time_t
current_time
=
time
(
nullptr
);
if
(
current_time
>=
prev_time_of_check_delay
+
static_cast
<
time_t
>
(
storage
.
data
.
settings
.
check_delay_period
))
if
(
current_time
>=
prev_time_of_check_delay
+
static_cast
<
time_t
>
(
storage
.
settings
.
check_delay_period
))
{
/// Find out lag of replicas.
time_t
absolute_delay
=
0
;
...
...
@@ -136,10 +136,10 @@ void ReplicatedMergeTreeRestartingThread::run()
/// We give up leadership if the relative lag is greater than threshold.
if
(
storage
.
is_leader
&&
relative_delay
>
static_cast
<
time_t
>
(
storage
.
data
.
settings
.
min_relative_delay_to_yield_leadership
))
&&
relative_delay
>
static_cast
<
time_t
>
(
storage
.
settings
.
min_relative_delay_to_yield_leadership
))
{
LOG_INFO
(
log
,
"Relative replica delay ("
<<
relative_delay
<<
" seconds) is bigger than threshold ("
<<
storage
.
data
.
settings
.
min_relative_delay_to_yield_leadership
<<
"). Will yield leadership."
);
<<
storage
.
settings
.
min_relative_delay_to_yield_leadership
<<
"). Will yield leadership."
);
ProfileEvents
::
increment
(
ProfileEvents
::
ReplicaYieldLeadership
);
...
...
@@ -181,7 +181,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart
();
if
(
storage
.
data
.
settings
.
replicated_can_become_leader
)
if
(
storage
.
settings
.
replicated_can_become_leader
)
storage
.
enterLeaderElection
();
else
LOG_INFO
(
log
,
"Will not enter leader election because replicated_can_become_leader=0"
);
...
...
@@ -239,13 +239,13 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
for
(
auto
part_name
:
failed_parts
)
{
auto
part
=
storage
.
data
.
getPartIfExists
(
auto
part
=
storage
.
getPartIfExists
(
part_name
,
{
MergeTreeDataPartState
::
PreCommitted
,
MergeTreeDataPartState
::
Committed
,
MergeTreeDataPartState
::
Outdated
});
if
(
part
)
{
LOG_DEBUG
(
log
,
"Found part "
<<
part_name
<<
" with failed quorum. Moving to detached. This shouldn't happen often."
);
storage
.
data
.
forgetPartAndMoveToDetached
(
part
,
"noquorum_"
);
storage
.
forgetPartAndMoveToDetached
(
part
,
"noquorum_"
);
storage
.
queue
.
removeFromVirtualParts
(
part
->
info
);
}
}
...
...
dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h
浏览文件 @
2e34c5c7
...
...
@@ -32,9 +32,9 @@ public:
bool
supportsIndexForIn
()
const
override
{
return
true
;
}
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
/* query_context */
)
const
override
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
query_context
)
const
override
{
return
part
->
storage
.
mayBenefitFromIndexForIn
(
left_in_operand
);
return
part
->
storage
.
mayBenefitFromIndexForIn
(
left_in_operand
,
query_context
);
}
protected:
...
...
dbms/src/Storages/StorageMergeTree.cpp
浏览文件 @
2e34c5c7
此差异已折叠。
点击以展开。
dbms/src/Storages/StorageMergeTree.h
浏览文件 @
2e34c5c7
...
...
@@ -20,34 +20,18 @@ namespace DB
/** See the description of the data structure in MergeTreeData.
*/
class
StorageMergeTree
:
public
ext
::
shared_ptr_helper
<
StorageMergeTree
>
,
public
IStorage
class
StorageMergeTree
:
public
ext
::
shared_ptr_helper
<
StorageMergeTree
>
,
public
MergeTreeData
{
public:
void
startup
()
override
;
void
shutdown
()
override
;
~
StorageMergeTree
()
override
;
std
::
string
getName
()
const
override
{
return
data
.
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getName
()
const
override
{
return
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsSampling
()
const
override
{
return
data
.
supportsSampling
();
}
bool
supportsPrewhere
()
const
override
{
return
data
.
supportsPrewhere
();
}
bool
supportsFinal
()
const
override
{
return
data
.
supportsFinal
();
}
bool
supportsIndexForIn
()
const
override
{
return
true
;
}
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
/* query_context */
)
const
override
{
return
data
.
mayBenefitFromIndexForIn
(
left_in_operand
);
}
const
ColumnsDescription
&
getColumns
()
const
override
{
return
data
.
getColumns
();
}
void
setColumns
(
ColumnsDescription
columns_
)
override
{
return
data
.
setColumns
(
std
::
move
(
columns_
));
}
virtual
const
IndicesDescription
&
getIndices
()
const
override
{
return
data
.
getIndices
();
}
virtual
void
setIndices
(
IndicesDescription
indices_
)
override
{
data
.
setIndices
(
std
::
move
(
indices_
));
}
NameAndTypePair
getColumn
(
const
String
&
column_name
)
const
override
{
return
data
.
getColumn
(
column_name
);
}
bool
hasColumn
(
const
String
&
column_name
)
const
override
{
return
data
.
hasColumn
(
column_name
);
}
BlockInputStreams
read
(
const
Names
&
column_names
,
...
...
@@ -66,7 +50,7 @@ public:
void
alterPartition
(
const
ASTPtr
&
query
,
const
PartitionCommands
&
commands
,
const
Context
&
context
)
override
;
void
mutate
(
const
MutationCommands
&
commands
,
const
Context
&
context
)
override
;
std
::
vector
<
MergeTreeMutationStatus
>
getMutationsStatus
()
const
;
std
::
vector
<
MergeTreeMutationStatus
>
getMutationsStatus
()
const
override
;
CancellationCode
killMutation
(
const
String
&
mutation_id
)
override
;
void
drop
()
override
;
...
...
@@ -84,32 +68,13 @@ public:
ActionLock
getActionLock
(
StorageActionBlockType
action_type
)
override
;
MergeTreeData
&
getData
()
{
return
data
;
}
const
MergeTreeData
&
getData
()
const
{
return
data
;
}
String
getDataPath
()
const
override
{
return
full_path
;
}
ASTPtr
getPartitionKeyAST
()
const
override
{
return
data
.
partition_by_ast
;
}
ASTPtr
getSortingKeyAST
()
const
override
{
return
data
.
getSortingKeyAST
();
}
ASTPtr
getPrimaryKeyAST
()
const
override
{
return
data
.
getPrimaryKeyAST
();
}
ASTPtr
getSamplingKeyAST
()
const
override
{
return
data
.
getSamplingExpression
();
}
Names
getColumnsRequiredForPartitionKey
()
const
override
{
return
data
.
getColumnsRequiredForPartitionKey
();
}
Names
getColumnsRequiredForSortingKey
()
const
override
{
return
data
.
getColumnsRequiredForSortingKey
();
}
Names
getColumnsRequiredForPrimaryKey
()
const
override
{
return
data
.
getColumnsRequiredForPrimaryKey
();
}
Names
getColumnsRequiredForSampling
()
const
override
{
return
data
.
getColumnsRequiredForSampling
();
}
Names
getColumnsRequiredForFinal
()
const
override
{
return
data
.
getColumnsRequiredForSortingKey
();
}
private:
String
path
;
String
database_name
;
String
table_name
;
String
full_path
;
Context
global_context
;
BackgroundProcessingPool
&
background_pool
;
MergeTreeData
data
;
MergeTreeDataSelectExecutor
reader
;
MergeTreeDataWriter
writer
;
MergeTreeDataMergerMutator
merger_mutator
;
...
...
@@ -121,12 +86,10 @@ private:
AtomicStopwatch
time_after_previous_cleanup
;
mutable
std
::
mutex
currently_merging_mutex
;
MergeTreeData
::
DataParts
currently_merging
;
DataParts
currently_merging
;
std
::
map
<
String
,
MergeTreeMutationEntry
>
current_mutations_by_id
;
std
::
multimap
<
Int64
,
MergeTreeMutationEntry
&>
current_mutations_by_version
;
Logger
*
log
;
std
::
atomic
<
bool
>
shutdown_called
{
false
};
BackgroundProcessingPool
::
TaskHandle
background_task_handle
;
...
...
@@ -137,8 +100,7 @@ private:
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully.
*/
bool
merge
(
bool
aggressive
,
const
String
&
partition_id
,
bool
final
,
bool
deduplicate
,
String
*
out_disable_reason
=
nullptr
);
bool
merge
(
bool
aggressive
,
const
String
&
partition_id
,
bool
final
,
bool
deduplicate
,
String
*
out_disable_reason
=
nullptr
);
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool
tryMutatePart
();
...
...
@@ -146,7 +108,7 @@ private:
BackgroundProcessingPoolTaskResult
backgroundTask
();
Int64
getCurrentMutationVersion
(
const
MergeTreeData
::
DataPartPtr
&
part
,
const
DataPartPtr
&
part
,
std
::
lock_guard
<
std
::
mutex
>
&
/* currently_merging_mutex_lock */
)
const
;
void
clearOldMutations
();
...
...
@@ -182,7 +144,7 @@ protected:
const
ASTPtr
&
primary_key_ast_
,
const
ASTPtr
&
sample_by_ast_
,
/// nullptr, if sampling is not supported.
const
ASTPtr
&
ttl_table_ast_
,
const
Merg
eTreeData
::
Merg
ingParams
&
merging_params_
,
const
MergingParams
&
merging_params_
,
const
MergeTreeSettings
&
settings_
,
bool
has_force_restore_data_flag
);
};
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
2e34c5c7
此差异已折叠。
点击以展开。
dbms/src/Storages/StorageReplicatedMergeTree.h
浏览文件 @
2e34c5c7
...
...
@@ -72,36 +72,20 @@ namespace DB
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
class
StorageReplicatedMergeTree
:
public
ext
::
shared_ptr_helper
<
StorageReplicatedMergeTree
>
,
public
IStorage
class
StorageReplicatedMergeTree
:
public
ext
::
shared_ptr_helper
<
StorageReplicatedMergeTree
>
,
public
MergeTreeData
{
public:
void
startup
()
override
;
void
shutdown
()
override
;
~
StorageReplicatedMergeTree
()
override
;
std
::
string
getName
()
const
override
{
return
"Replicated"
+
data
.
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getName
()
const
override
{
return
"Replicated"
+
merging_params
.
getModeName
()
+
"MergeTree"
;
}
std
::
string
getTableName
()
const
override
{
return
table_name
;
}
std
::
string
getDatabaseName
()
const
override
{
return
database_name
;
}
bool
supportsSampling
()
const
override
{
return
data
.
supportsSampling
();
}
bool
supportsFinal
()
const
override
{
return
data
.
supportsFinal
();
}
bool
supportsPrewhere
()
const
override
{
return
data
.
supportsPrewhere
();
}
bool
supportsReplication
()
const
override
{
return
true
;
}
bool
supportsDeduplication
()
const
override
{
return
true
;
}
const
ColumnsDescription
&
getColumns
()
const
override
{
return
data
.
getColumns
();
}
void
setColumns
(
ColumnsDescription
columns_
)
override
{
return
data
.
setColumns
(
std
::
move
(
columns_
));
}
NameAndTypePair
getColumn
(
const
String
&
column_name
)
const
override
{
return
data
.
getColumn
(
column_name
);
}
bool
hasColumn
(
const
String
&
column_name
)
const
override
{
return
data
.
hasColumn
(
column_name
);
}
BlockInputStreams
read
(
const
Names
&
column_names
,
const
SelectQueryInfo
&
query_info
,
...
...
@@ -121,7 +105,7 @@ public:
void
alterPartition
(
const
ASTPtr
&
query
,
const
PartitionCommands
&
commands
,
const
Context
&
query_context
)
override
;
void
mutate
(
const
MutationCommands
&
commands
,
const
Context
&
context
)
override
;
std
::
vector
<
MergeTreeMutationStatus
>
getMutationsStatus
()
const
;
std
::
vector
<
MergeTreeMutationStatus
>
getMutationsStatus
()
const
override
;
CancellationCode
killMutation
(
const
String
&
mutation_id
)
override
;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
...
...
@@ -133,10 +117,6 @@ public:
void
rename
(
const
String
&
new_path_to_db
,
const
String
&
new_database_name
,
const
String
&
new_table_name
)
override
;
bool
supportsIndexForIn
()
const
override
{
return
true
;
}
bool
mayBenefitFromIndexForIn
(
const
ASTPtr
&
left_in_operand
,
const
Context
&
/* query_context */
)
const
override
{
return
data
.
mayBenefitFromIndexForIn
(
left_in_operand
);
}
void
checkTableCanBeDropped
()
const
override
;
...
...
@@ -148,10 +128,6 @@ public:
/// If timeout is exceeded returns false
bool
waitForShrinkingQueueSize
(
size_t
queue_size
=
0
,
UInt64
max_wait_milliseconds
=
0
);
MergeTreeData
&
getData
()
{
return
data
;
}
const
MergeTreeData
&
getData
()
const
{
return
data
;
}
/** For the system table replicas. */
struct
Status
{
...
...
@@ -194,17 +170,6 @@ public:
String
getDataPath
()
const
override
{
return
full_path
;
}
ASTPtr
getPartitionKeyAST
()
const
override
{
return
data
.
partition_by_ast
;
}
ASTPtr
getSortingKeyAST
()
const
override
{
return
data
.
getSortingKeyAST
();
}
ASTPtr
getPrimaryKeyAST
()
const
override
{
return
data
.
getPrimaryKeyAST
();
}
ASTPtr
getSamplingKeyAST
()
const
override
{
return
data
.
getSamplingExpression
();
}
Names
getColumnsRequiredForPartitionKey
()
const
override
{
return
data
.
getColumnsRequiredForPartitionKey
();
}
Names
getColumnsRequiredForSortingKey
()
const
override
{
return
data
.
getColumnsRequiredForSortingKey
();
}
Names
getColumnsRequiredForPrimaryKey
()
const
override
{
return
data
.
getColumnsRequiredForPrimaryKey
();
}
Names
getColumnsRequiredForSampling
()
const
override
{
return
data
.
getColumnsRequiredForSampling
();
}
Names
getColumnsRequiredForFinal
()
const
override
{
return
data
.
getColumnsRequiredForSortingKey
();
}
private:
/// Delete old parts from disk and from ZooKeeper.
void
clearOldPartsAndRemoveFromZK
();
...
...
@@ -222,8 +187,6 @@ private:
using
LogEntry
=
ReplicatedMergeTreeLogEntry
;
using
LogEntryPtr
=
LogEntry
::
Ptr
;
Context
global_context
;
zkutil
::
ZooKeeperPtr
current_zookeeper
;
/// Use only the methods below.
std
::
mutex
current_zookeeper_mutex
;
/// To recreate the session in the background thread.
...
...
@@ -234,10 +197,6 @@ private:
/// If true, the table is offline and can not be written to it.
std
::
atomic_bool
is_readonly
{
false
};
String
database_name
;
String
table_name
;
String
full_path
;
String
zookeeper_path
;
String
replica_name
;
String
replica_path
;
...
...
@@ -264,7 +223,6 @@ private:
InterserverIOEndpointHolderPtr
data_parts_exchange_endpoint_holder
;
MergeTreeData
data
;
MergeTreeDataSelectExecutor
reader
;
MergeTreeDataWriter
writer
;
MergeTreeDataMergerMutator
merger_mutator
;
...
...
@@ -325,8 +283,6 @@ private:
/// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil
::
EventPtr
alter_query_event
=
std
::
make_shared
<
Poco
::
Event
>
();
Logger
*
log
;
/** Creates the minimum set of nodes in ZooKeeper.
*/
void
createTableIfNotExists
();
...
...
@@ -362,24 +318,24 @@ private:
* Adds actions to `ops` that add data about the part into ZooKeeper.
* Call under TableStructureLock.
*/
void
checkPartChecksumsAndAddCommitOps
(
const
zkutil
::
ZooKeeperPtr
&
zookeeper
,
const
MergeTreeData
::
DataPartPtr
&
part
,
void
checkPartChecksumsAndAddCommitOps
(
const
zkutil
::
ZooKeeperPtr
&
zookeeper
,
const
DataPartPtr
&
part
,
Coordination
::
Requests
&
ops
,
String
part_name
=
""
,
NameSet
*
absent_replicas_paths
=
nullptr
);
String
getChecksumsForZooKeeper
(
const
MergeTreeDataPartChecksums
&
checksums
)
const
;
/// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part
MergeTreeData
::
DataPartsVector
checkPartChecksumsAndCommit
(
MergeTreeData
::
Transaction
&
transaction
,
const
MergeTreeData
::
DataPartPtr
&
part
);
DataPartsVector
checkPartChecksumsAndCommit
(
Transaction
&
transaction
,
const
DataPartPtr
&
part
);
void
getCommitPartOps
(
Coordination
::
Requests
&
ops
,
M
ergeTreeData
::
M
utableDataPartPtr
&
part
,
MutableDataPartPtr
&
part
,
const
String
&
block_id_path
=
""
)
const
;
/// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful.
void
updatePartHeaderInZooKeeperAndCommit
(
const
zkutil
::
ZooKeeperPtr
&
zookeeper
,
MergeTreeData
::
AlterDataPartTransaction
&
transaction
);
AlterDataPartTransaction
&
transaction
);
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
...
...
@@ -390,7 +346,7 @@ private:
NameSet
*
parts_should_be_retried
=
nullptr
);
bool
tryRemovePartsFromZooKeeperWithRetries
(
const
Strings
&
part_names
,
size_t
max_retries
=
5
);
bool
tryRemovePartsFromZooKeeperWithRetries
(
MergeTreeData
::
DataPartsVector
&
parts
,
size_t
max_retries
=
5
);
bool
tryRemovePartsFromZooKeeperWithRetries
(
DataPartsVector
&
parts
,
size_t
max_retries
=
5
);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void
removePartAndEnqueueFetch
(
const
String
&
part_name
);
...
...
@@ -405,8 +361,8 @@ private:
void
writePartLog
(
PartLogElement
::
Type
type
,
const
ExecutionStatus
&
execution_status
,
UInt64
elapsed_ns
,
const
String
&
new_part_name
,
const
MergeTreeData
::
DataPartPtr
&
result_part
,
const
MergeTreeData
::
DataPartsVector
&
source_parts
,
const
DataPartPtr
&
result_part
,
const
DataPartsVector
&
source_parts
,
const
MergeListEntry
*
merge_entry
);
void
executeDropRange
(
const
LogEntry
&
entry
);
...
...
@@ -463,7 +419,7 @@ private:
*/
bool
createLogEntryToMergeParts
(
zkutil
::
ZooKeeperPtr
&
zookeeper
,
const
MergeTreeData
::
DataPartsVector
&
parts
,
const
DataPartsVector
&
parts
,
const
String
&
merged_name
,
bool
deduplicate
,
ReplicatedMergeTreeLogEntryData
*
out_log_entry
=
nullptr
);
...
...
@@ -564,7 +520,7 @@ protected:
const
ASTPtr
&
primary_key_ast_
,
const
ASTPtr
&
sample_by_ast_
,
const
ASTPtr
&
table_ttl_ast_
,
const
Merg
eTreeData
::
Merg
ingParams
&
merging_params_
,
const
MergingParams
&
merging_params_
,
const
MergeTreeSettings
&
settings_
,
bool
has_force_restore_data_flag
);
};
...
...
dbms/src/Storages/System/StorageSystemColumns.cpp
浏览文件 @
2e34c5c7
#include <optional>
#include <Storages/System/StorageSystemColumns.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
...
...
@@ -38,10 +36,10 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
{
"marks_bytes"
,
std
::
make_shared
<
DataTypeUInt64
>
()
},
{
"comment"
,
std
::
make_shared
<
DataTypeString
>
()
},
{
"is_in_partition_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"is_in_sorting_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"is_in_primary_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"is_in_sampling_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"compression_codec"
,
std
::
make_shared
<
DataTypeString
>
()
},
{
"is_in_sorting_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"is_in_primary_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"is_in_sampling_key"
,
std
::
make_shared
<
DataTypeUInt8
>
()
},
{
"compression_codec"
,
std
::
make_shared
<
DataTypeString
>
()
},
}));
}
...
...
@@ -124,16 +122,10 @@ protected:
cols_required_for_sampling
=
storage
->
getColumnsRequiredForSampling
();
/** Info about sizes of columns for tables of MergeTree family.
* NOTE: It is possible to add getter for this info to IStorage interface.
*/
if
(
auto
storage_concrete_plain
=
dynamic_cast
<
StorageMergeTree
*>
(
storage
.
get
()))
{
column_sizes
=
storage_concrete_plain
->
getData
().
getColumnSizes
();
}
else
if
(
auto
storage_concrete_replicated
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
storage
.
get
()))
{
column_sizes
=
storage_concrete_replicated
->
getData
().
getColumnSizes
();
}
* NOTE: It is possible to add getter for this info to IStorage interface.
*/
if
(
auto
storage_concrete
=
dynamic_cast
<
const
MergeTreeData
*>
(
storage
.
get
()))
column_sizes
=
storage_concrete
->
getColumnSizes
();
}
for
(
const
auto
&
column
:
columns
)
...
...
dbms/src/Storages/System/StorageSystemGraphite.cpp
浏览文件 @
2e34c5c7
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
...
...
@@ -37,20 +35,10 @@ StorageSystemGraphite::Configs StorageSystemGraphite::getConfigs(const Context &
for
(
auto
iterator
=
db
.
second
->
getIterator
(
context
);
iterator
->
isValid
();
iterator
->
next
())
{
auto
&
table
=
iterator
->
table
();
const
MergeTreeData
*
table_data
=
nullptr
;
if
(
const
StorageMergeTree
*
merge_tree
=
dynamic_cast
<
StorageMergeTree
*>
(
table
.
get
()))
{
table_data
=
&
merge_tree
->
getData
();
}
else
if
(
const
StorageReplicatedMergeTree
*
replicated_merge_tree
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
table
.
get
()))
{
table_data
=
&
replicated_merge_tree
->
getData
();
}
else
{
const
MergeTreeData
*
table_data
=
dynamic_cast
<
const
MergeTreeData
*>
(
table
.
get
());
if
(
!
table_data
)
continue
;
}
if
(
table_data
->
merging_params
.
mode
==
MergeTreeData
::
MergingParams
::
Graphite
)
{
...
...
dbms/src/Storages/System/StorageSystemMutations.cpp
浏览文件 @
2e34c5c7
...
...
@@ -4,8 +4,8 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/
StorageMergeTree
.h>
#include <Storages/
StorageReplicatedMergeTree
.h>
#include <Storages/
MergeTree/MergeTreeData
.h>
#include <Storages/
MergeTree/MergeTreeMutationStatus
.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
...
...
@@ -38,19 +38,10 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
/// Collect a set of *MergeTree tables.
std
::
map
<
String
,
std
::
map
<
String
,
StoragePtr
>>
merge_tree_tables
;
for
(
const
auto
&
db
:
context
.
getDatabases
())
{
if
(
context
.
hasDatabaseAccessRights
(
db
.
first
))
{
for
(
auto
iterator
=
db
.
second
->
getIterator
(
context
);
iterator
->
isValid
();
iterator
->
next
())
{
if
(
dynamic_cast
<
const
StorageMergeTree
*>
(
iterator
->
table
().
get
())
||
dynamic_cast
<
const
StorageReplicatedMergeTree
*>
(
iterator
->
table
().
get
()))
{
if
(
dynamic_cast
<
const
MergeTreeData
*>
(
iterator
->
table
().
get
()))
merge_tree_tables
[
db
.
first
][
iterator
->
name
()]
=
iterator
->
table
();
}
}
}
}
MutableColumnPtr
col_database_mut
=
ColumnString
::
create
();
MutableColumnPtr
col_table_mut
=
ColumnString
::
create
();
...
...
@@ -92,10 +83,8 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
std
::
vector
<
MergeTreeMutationStatus
>
statuses
;
{
const
IStorage
*
storage
=
merge_tree_tables
[
database
][
table
].
get
();
if
(
const
auto
*
merge_tree
=
dynamic_cast
<
const
StorageMergeTree
*>
(
storage
))
if
(
const
auto
*
merge_tree
=
dynamic_cast
<
const
MergeTreeData
*>
(
storage
))
statuses
=
merge_tree
->
getMutationsStatus
();
else
if
(
const
auto
*
replicated
=
dynamic_cast
<
const
StorageReplicatedMergeTree
*>
(
storage
))
statuses
=
replicated
->
getMutationsStatus
();
}
for
(
const
MergeTreeMutationStatus
&
status
:
statuses
)
...
...
dbms/src/Storages/System/StorageSystemParts.cpp
浏览文件 @
2e34c5c7
...
...
@@ -6,8 +6,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemParts.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
...
...
dbms/src/Storages/System/StorageSystemPartsBase.cpp
浏览文件 @
2e34c5c7
...
...
@@ -7,8 +7,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
...
...
@@ -93,8 +92,7 @@ public:
StoragePtr
storage
=
iterator
->
table
();
String
engine_name
=
storage
->
getName
();
if
(
!
dynamic_cast
<
StorageMergeTree
*>
(
&*
storage
)
&&
!
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
&*
storage
))
if
(
!
dynamic_cast
<
MergeTreeData
*>
(
storage
.
get
()))
continue
;
storages
[
std
::
make_pair
(
database_name
,
iterator
->
name
())]
=
storage
;
...
...
@@ -184,20 +182,9 @@ public:
info
.
engine
=
info
.
storage
->
getName
();
info
.
data
=
nullptr
;
if
(
auto
merge_tree
=
dynamic_cast
<
StorageMergeTree
*>
(
&*
info
.
storage
))
{
info
.
data
=
&
merge_tree
->
getData
();
}
else
if
(
auto
replicated_merge_tree
=
dynamic_cast
<
StorageReplicatedMergeTree
*>
(
&*
info
.
storage
))
{
info
.
data
=
&
replicated_merge_tree
->
getData
();
}
else
{
info
.
data
=
dynamic_cast
<
MergeTreeData
*>
(
info
.
storage
.
get
());
if
(
!
info
.
data
)
throw
Exception
(
"Unknown engine "
+
info
.
engine
,
ErrorCodes
::
LOGICAL_ERROR
);
}
using
State
=
MergeTreeDataPart
::
State
;
auto
&
all_parts_state
=
info
.
all_parts_state
;
...
...
dbms/src/Storages/System/StorageSystemPartsColumns.cpp
浏览文件 @
2e34c5c7
...
...
@@ -6,7 +6,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemPartsColumns.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录