Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
d29ae515
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d29ae515
编写于
1月 26, 2016
作者:
A
Alexey Milovidov
浏览文件
操作
浏览文件
下载
差异文件
Merge
上级
9156262d
ae2f7c76
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
516 addition
and
557 deletion
+516
-557
dbms/include/DB/Common/BlockFilterCreator.h
dbms/include/DB/Common/BlockFilterCreator.h
+64
-0
dbms/include/DB/Interpreters/Context.h
dbms/include/DB/Interpreters/Context.h
+1
-0
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
+3
-3
dbms/include/DB/Parsers/ASTAlterQuery.h
dbms/include/DB/Parsers/ASTAlterQuery.h
+1
-1
dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h
...de/DB/Storages/Distributed/DistributedBlockOutputStream.h
+12
-206
dbms/include/DB/Storages/IStorage.h
dbms/include/DB/Storages/IStorage.h
+1
-1
dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h
dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h
+2
-0
dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h
dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h
+5
-3
dbms/include/DB/Storages/MergeTree/ReshardingJob.h
dbms/include/DB/Storages/MergeTree/ReshardingJob.h
+2
-2
dbms/include/DB/Storages/MergeTree/ReshardingWorker.h
dbms/include/DB/Storages/MergeTree/ReshardingWorker.h
+8
-12
dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h
dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h
+4
-1
dbms/include/DB/Storages/StorageDistributed.h
dbms/include/DB/Storages/StorageDistributed.h
+1
-1
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
+2
-2
dbms/src/Core/ErrorCodes.cpp
dbms/src/Core/ErrorCodes.cpp
+6
-7
dbms/src/Interpreters/Context.cpp
dbms/src/Interpreters/Context.cpp
+9
-6
dbms/src/Interpreters/InterpreterAlterQuery.cpp
dbms/src/Interpreters/InterpreterAlterQuery.cpp
+3
-4
dbms/src/Parsers/ASTAlterQuery.cpp
dbms/src/Parsers/ASTAlterQuery.cpp
+6
-2
dbms/src/Parsers/ParserAlterQuery.cpp
dbms/src/Parsers/ParserAlterQuery.cpp
+2
-5
dbms/src/Server/Server.cpp
dbms/src/Server/Server.cpp
+4
-7
dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp
...src/Storages/Distributed/DistributedBlockOutputStream.cpp
+180
-0
dbms/src/Storages/MergeTree/MergeTreeSharder.cpp
dbms/src/Storages/MergeTree/MergeTreeSharder.cpp
+15
-52
dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp
dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp
+5
-4
dbms/src/Storages/MergeTree/ReshardingJob.cpp
dbms/src/Storages/MergeTree/ReshardingJob.cpp
+24
-4
dbms/src/Storages/MergeTree/ReshardingWorker.cpp
dbms/src/Storages/MergeTree/ReshardingWorker.cpp
+125
-144
dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp
dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp
+18
-3
dbms/src/Storages/StorageDistributed.cpp
dbms/src/Storages/StorageDistributed.cpp
+2
-2
dbms/src/Storages/StorageReplicatedMergeTree.cpp
dbms/src/Storages/StorageReplicatedMergeTree.cpp
+8
-68
libs/libzkutil/include/zkutil/ZooKeeper.h
libs/libzkutil/include/zkutil/ZooKeeper.h
+1
-4
libs/libzkutil/src/ZooKeeper.cpp
libs/libzkutil/src/ZooKeeper.cpp
+2
-13
未找到文件。
dbms/include/DB/Common/BlockFilterCreator.h
0 → 100644
浏览文件 @
d29ae515
#pragma once
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnVector.h>
#include <type_traits>
#if defined(__x86_64__)
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
namespace
DB
{
template
<
typename
T
>
struct
BlockFilterCreator
{
static
std
::
vector
<
IColumn
::
Filter
>
perform
(
const
size_t
num_rows
,
const
IColumn
*
column
,
size_t
num_shards
,
const
std
::
vector
<
size_t
>
&
slots
)
{
const
auto
total_weight
=
slots
.
size
();
std
::
vector
<
IColumn
::
Filter
>
filters
(
num_shards
);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using
UnsignedT
=
typename
std
::
make_unsigned
<
T
>::
type
;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if
(
column
->
isConst
())
{
const
auto
data
=
typeid_cast
<
const
ColumnConst
<
T
>
*>
(
column
)
->
getData
();
const
auto
shard_num
=
slots
[
static_cast
<
UnsignedT
>
(
data
)
%
total_weight
];
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
filters
[
i
].
assign
(
num_rows
,
static_cast
<
UInt8
>
(
shard_num
==
i
));
}
else
{
/// libdivide поддерживает только UInt32 или UInt64.
using
TUInt32Or64
=
typename
std
::
conditional
<
sizeof
(
UnsignedT
)
<=
4
,
UInt32
,
UInt64
>::
type
;
libdivide
::
divider
<
TUInt32Or64
>
divider
(
total_weight
);
const
auto
&
data
=
typeid_cast
<
const
ColumnVector
<
T
>
*>
(
column
)
->
getData
();
/// NOTE Может быть, стоит поменять местами циклы.
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
{
filters
[
i
].
resize
(
num_rows
);
for
(
size_t
j
=
0
;
j
<
num_rows
;
++
j
)
filters
[
i
][
j
]
=
slots
[
static_cast
<
TUInt32Or64
>
(
data
[
j
])
-
(
static_cast
<
TUInt32Or64
>
(
data
[
j
])
/
divider
)
*
total_weight
]
==
i
;
}
}
return
filters
;
}
};
}
dbms/include/DB/Interpreters/Context.h
浏览文件 @
d29ae515
...
...
@@ -251,6 +251,7 @@ public:
BackgroundProcessingPool
&
getBackgroundPool
();
void
setReshardingWorker
(
std
::
shared_ptr
<
ReshardingWorker
>
resharding_worker
);
ReshardingWorker
&
getReshardingWorker
();
/** Очистить кэши разжатых блоков и засечек.
...
...
dbms/include/DB/Interpreters/InterpreterAlterQuery.h
浏览文件 @
d29ae515
...
...
@@ -55,7 +55,7 @@ private:
/// Для RESHARD PARTITION.
Field
last_partition
;
WeightedZooKeeperPaths
weighted_zookeeper_paths
;
String
sharding_key
;
ASTPtr
sharding_key_expr
;
static
PartitionCommand
dropPartition
(
const
Field
&
partition
,
bool
detach
,
bool
unreplicated
)
{
...
...
@@ -78,9 +78,9 @@ private:
}
static
PartitionCommand
reshardPartitions
(
const
Field
&
first_partition_
,
const
Field
&
last_partition_
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths_
,
const
String
&
sharding_key_
)
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths_
,
const
ASTPtr
&
sharding_key_expr
)
{
return
{
RESHARD_PARTITION
,
first_partition_
,
false
,
false
,
false
,
{},
last_partition_
,
weighted_zookeeper_paths_
,
sharding_key_
};
return
{
RESHARD_PARTITION
,
first_partition_
,
false
,
false
,
false
,
{},
last_partition_
,
weighted_zookeeper_paths_
,
sharding_key_
expr
};
}
};
...
...
dbms/include/DB/Parsers/ASTAlterQuery.h
浏览文件 @
d29ae515
...
...
@@ -70,7 +70,7 @@ public:
*/
ASTPtr
last_partition
;
ASTPtr
weighted_zookeeper_paths
;
String
sharding_key
;
ASTPtr
sharding_key_expr
;
/// deep copy
void
clone
(
Parameters
&
p
)
const
;
...
...
dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h
浏览文件 @
d29ae515
#pragma once
#include <DB/Storages/StorageDistributed.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/Increment.h>
#include <memory>
#include <common/Revision.h>
#include <iostream>
#include <type_traits>
#if defined(__x86_64__)
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Core/Block.h>
namespace
DB
{
class
StorageDistributed
;
/** Запись асинхронная - данные сначала записываются на локальную файловую систему, а потом отправляются на удалённые серверы.
* Если Distributed таблица использует более одного шарда, то для того, чтобы поддерживалась запись,
* при создании таблицы должен быть указан дополнительный параметр у ENGINE - ключ шардирования.
...
...
@@ -38,198 +20,22 @@ namespace DB
class
DistributedBlockOutputStream
:
public
IBlockOutputStream
{
public:
DistributedBlockOutputStream
(
StorageDistributed
&
storage
,
const
ASTPtr
&
query_ast
)
:
storage
(
storage
),
query_ast
(
query_ast
)
{
}
void
write
(
const
Block
&
block
)
override
{
if
(
storage
.
getShardingKeyExpr
()
&&
(
storage
.
cluster
.
getShardsInfo
().
size
()
>
1
))
return
writeSplit
(
block
);
DistributedBlockOutputStream
(
StorageDistributed
&
storage
,
const
ASTPtr
&
query_ast
);
writeImpl
(
block
);
}
void
write
(
const
Block
&
block
)
override
;
private:
template
<
typename
T
>
static
std
::
vector
<
IColumn
::
Filter
>
createFiltersImpl
(
const
size_t
num_rows
,
const
IColumn
*
column
,
const
Cluster
&
cluster
)
{
const
auto
total_weight
=
cluster
.
slot_to_shard
.
size
();
const
auto
num_shards
=
cluster
.
getShardsInfo
().
size
();
std
::
vector
<
IColumn
::
Filter
>
filters
(
num_shards
);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using
UnsignedT
=
typename
std
::
make_unsigned
<
T
>::
type
;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if
(
column
->
isConst
())
{
const
auto
data
=
typeid_cast
<
const
ColumnConst
<
T
>
*>
(
column
)
->
getData
();
const
auto
shard_num
=
cluster
.
slot_to_shard
[
static_cast
<
UnsignedT
>
(
data
)
%
total_weight
];
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
filters
[
i
].
assign
(
num_rows
,
static_cast
<
UInt8
>
(
shard_num
==
i
));
}
else
{
/// libdivide поддерживает только UInt32 или UInt64.
using
TUInt32Or64
=
typename
std
::
conditional
<
sizeof
(
UnsignedT
)
<=
4
,
UInt32
,
UInt64
>::
type
;
libdivide
::
divider
<
TUInt32Or64
>
divider
(
total_weight
);
const
auto
&
data
=
typeid_cast
<
const
ColumnVector
<
T
>
*>
(
column
)
->
getData
();
/// NOTE Может быть, стоит поменять местами циклы.
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
{
filters
[
i
].
resize
(
num_rows
);
for
(
size_t
j
=
0
;
j
<
num_rows
;
++
j
)
filters
[
i
][
j
]
=
cluster
.
slot_to_shard
[
static_cast
<
TUInt32Or64
>
(
data
[
j
])
-
(
static_cast
<
TUInt32Or64
>
(
data
[
j
])
/
divider
)
*
total_weight
]
==
i
;
}
}
return
filters
;
}
std
::
vector
<
IColumn
::
Filter
>
createFilters
(
Block
block
)
{
using
create_filters_sig
=
std
::
vector
<
IColumn
::
Filter
>
(
size_t
,
const
IColumn
*
,
const
Cluster
&
);
/// hashmap of pointers to functions corresponding to each integral type
static
std
::
unordered_map
<
std
::
string
,
create_filters_sig
*>
creators
{
{
TypeName
<
UInt8
>::
get
(),
&
createFiltersImpl
<
UInt8
>
},
{
TypeName
<
UInt16
>::
get
(),
&
createFiltersImpl
<
UInt16
>
},
{
TypeName
<
UInt32
>::
get
(),
&
createFiltersImpl
<
UInt32
>
},
{
TypeName
<
UInt64
>::
get
(),
&
createFiltersImpl
<
UInt64
>
},
{
TypeName
<
Int8
>::
get
(),
&
createFiltersImpl
<
Int8
>
},
{
TypeName
<
Int16
>::
get
(),
&
createFiltersImpl
<
Int16
>
},
{
TypeName
<
Int32
>::
get
(),
&
createFiltersImpl
<
Int32
>
},
{
TypeName
<
Int64
>::
get
(),
&
createFiltersImpl
<
Int64
>
},
};
storage
.
getShardingKeyExpr
()
->
execute
(
block
);
const
auto
&
key_column
=
block
.
getByName
(
storage
.
getShardingKeyColumnName
());
/// check that key column has valid type
const
auto
it
=
creators
.
find
(
key_column
.
type
->
getName
());
return
it
!=
std
::
end
(
creators
)
?
(
*
it
->
second
)(
block
.
rowsInFirstColumn
(),
key_column
.
column
.
get
(),
storage
.
cluster
)
:
throw
Exception
{
"Sharding key expression does not evaluate to an integer type"
,
ErrorCodes
::
TYPE_MISMATCH
};
}
void
writeSplit
(
const
Block
&
block
)
{
const
auto
num_cols
=
block
.
columns
();
/// cache column pointers for later reuse
std
::
vector
<
const
IColumn
*>
columns
(
num_cols
);
for
(
size_t
i
=
0
;
i
<
columns
.
size
();
++
i
)
columns
[
i
]
=
block
.
getByPosition
(
i
).
column
;
std
::
vector
<
IColumn
::
Filter
>
createFilters
(
Block
block
);
auto
filters
=
createFilters
(
block
);
void
writeSplit
(
const
Block
&
block
);
const
auto
num_shards
=
storage
.
cluster
.
getShardsInfo
().
size
(
);
void
writeImpl
(
const
Block
&
block
,
const
size_t
shard_id
=
0
);
ssize_t
size_hint
=
((
block
.
rowsInFirstColumn
()
+
num_shards
-
1
)
/
num_shards
)
*
1.1
;
/// Число 1.1 выбрано наугад.
void
writeToLocal
(
const
Block
&
block
,
const
size_t
repeats
);
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
{
auto
target_block
=
block
.
cloneEmpty
();
for
(
size_t
col
=
0
;
col
<
num_cols
;
++
col
)
target_block
.
getByPosition
(
col
).
column
=
columns
[
col
]
->
filter
(
filters
[
i
],
size_hint
);
if
(
target_block
.
rowsInFirstColumn
())
writeImpl
(
target_block
,
i
);
}
}
void
writeImpl
(
const
Block
&
block
,
const
size_t
shard_id
=
0
)
{
const
auto
&
shard_info
=
storage
.
cluster
.
getShardsInfo
()[
shard_id
];
if
(
shard_info
.
getLocalNodeCount
()
>
0
)
writeToLocal
(
block
,
shard_info
.
getLocalNodeCount
());
/// dir_names is empty if shard has only local addresses
if
(
!
shard_info
.
dir_names
.
empty
())
writeToShard
(
block
,
shard_info
.
dir_names
);
}
void
writeToLocal
(
const
Block
&
block
,
const
size_t
repeats
)
{
InterpreterInsertQuery
interp
{
query_ast
,
storage
.
context
};
auto
block_io
=
interp
.
execute
();
block_io
.
out
->
writePrefix
();
for
(
size_t
i
=
0
;
i
<
repeats
;
++
i
)
block_io
.
out
->
write
(
block
);
block_io
.
out
->
writeSuffix
();
}
void
writeToShard
(
const
Block
&
block
,
const
std
::
vector
<
std
::
string
>
&
dir_names
)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std
::
string
first_file_tmp_path
{};
auto
first
=
true
;
const
auto
&
query_string
=
queryToString
(
query_ast
);
/// write first file, hardlink the others
for
(
const
auto
&
dir_name
:
dir_names
)
{
const
auto
&
path
=
storage
.
getPath
()
+
dir_name
+
'/'
;
/// ensure shard subdirectory creation and notify storage
if
(
Poco
::
File
(
path
).
createDirectory
())
storage
.
requireDirectoryMonitor
(
dir_name
);
const
auto
&
file_name
=
toString
(
Increment
{
path
+
"increment.txt"
}.
get
(
true
))
+
".bin"
;
const
auto
&
block_file_path
=
path
+
file_name
;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if
(
first
)
{
first
=
false
;
const
auto
&
tmp_path
=
path
+
"tmp/"
;
Poco
::
File
(
tmp_path
).
createDirectory
();
const
auto
&
block_file_tmp_path
=
tmp_path
+
file_name
;
first_file_tmp_path
=
block_file_tmp_path
;
WriteBufferFromFile
out
{
block_file_tmp_path
};
CompressedWriteBuffer
compress
{
out
};
NativeBlockOutputStream
stream
{
compress
,
Revision
::
get
()};
writeStringBinary
(
query_string
,
out
);
stream
.
writePrefix
();
stream
.
write
(
block
);
stream
.
writeSuffix
();
}
if
(
link
(
first_file_tmp_path
.
data
(),
block_file_path
.
data
()))
throwFromErrno
(
"Could not link "
+
block_file_path
+
" to "
+
first_file_tmp_path
);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
Poco
::
File
(
first_file_tmp_path
).
remove
();
}
void
writeToShard
(
const
Block
&
block
,
const
std
::
vector
<
std
::
string
>
&
dir_names
);
private:
StorageDistributed
&
storage
;
ASTPtr
query_ast
;
};
...
...
dbms/include/DB/Storages/IStorage.h
浏览文件 @
d29ae515
...
...
@@ -242,7 +242,7 @@ public:
/** Выполнить запрос RESHARD PARTITION.
*/
virtual
void
reshardPartitions
(
const
String
&
database_name
,
const
Field
&
first_partition
,
const
Field
&
last_partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
String
&
sharding_key
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
ASTPtr
&
sharding_key_expr
,
const
Settings
&
settings
)
{
throw
Exception
(
"Method reshardPartition is not supported by storage "
+
getName
(),
ErrorCodes
::
NOT_IMPLEMENTED
);
...
...
dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h
浏览文件 @
d29ae515
...
...
@@ -56,6 +56,8 @@ private:
const
ReshardingJob
&
job
;
Logger
*
log
;
std
::
vector
<
size_t
>
slots
;
ExpressionActionsPtr
sharding_key_expr
;
std
::
string
sharding_key_column_name
;
};
}
dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h
浏览文件 @
d29ae515
...
...
@@ -7,6 +7,8 @@
namespace
DB
{
class
Context
;
namespace
RemoteDiskSpaceMonitor
{
...
...
@@ -15,14 +17,14 @@ namespace RemoteDiskSpaceMonitor
class
Service
final
:
public
InterserverIOEndpoint
{
public:
Service
(
const
std
::
string
&
path
_
);
Service
(
const
Context
&
context
_
);
Service
(
const
Service
&
)
=
delete
;
Service
&
operator
=
(
const
Service
&
)
=
delete
;
std
::
string
getId
(
const
std
::
string
&
node_id
)
const
override
;
void
processQuery
(
const
Poco
::
Net
::
HTMLForm
&
params
,
WriteBuffer
&
out
)
override
;
private:
const
std
::
string
path
;
const
Context
&
context
;
};
/** Клиент для получения информации о свободном месте на удалённом диске.
...
...
@@ -33,7 +35,7 @@ public:
Client
()
=
default
;
Client
(
const
Client
&
)
=
delete
;
Client
&
operator
=
(
const
Client
&
)
=
delete
;
size_t
getFree
Disk
Space
(
const
InterserverIOEndpointLocation
&
location
)
const
;
size_t
getFreeSpace
(
const
InterserverIOEndpointLocation
&
location
)
const
;
void
cancel
()
{
is_cancelled
=
true
;
}
private:
...
...
dbms/include/DB/Storages/MergeTree/ReshardingJob.h
浏览文件 @
d29ae515
...
...
@@ -16,7 +16,7 @@ public:
ReshardingJob
(
const
std
::
string
&
database_name_
,
const
std
::
string
&
table_name_
,
const
std
::
string
&
partition_
,
const
WeightedZooKeeperPaths
&
paths_
,
const
std
::
string
&
sharding_key
_
);
const
ASTPtr
&
sharding_key_expr
_
);
ReshardingJob
(
const
ReshardingJob
&
)
=
delete
;
ReshardingJob
&
operator
=
(
const
ReshardingJob
&
)
=
delete
;
...
...
@@ -29,7 +29,7 @@ public:
std
::
string
table_name
;
std
::
string
partition
;
WeightedZooKeeperPaths
paths
;
std
::
string
sharding_key
;
ASTPtr
sharding_key_expr
;
};
}
dbms/include/DB/Storages/MergeTree/ReshardingWorker.h
浏览文件 @
d29ae515
#pragma once
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/AlterCommands.h>
#include <common/logger_useful.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/SharedPtr.h>
#include <string>
#include <thread>
...
...
@@ -22,7 +24,8 @@ class ReshardingJob;
class
ReshardingWorker
final
{
public:
ReshardingWorker
(
Context
&
context_
);
ReshardingWorker
(
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
config_name
,
Context
&
context_
);
ReshardingWorker
(
const
ReshardingWorker
&
)
=
delete
;
ReshardingWorker
&
operator
=
(
const
ReshardingWorker
&
)
=
delete
;
...
...
@@ -37,18 +40,12 @@ public:
const
std
::
string
&
table_name
,
const
std
::
string
&
partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
std
::
string
&
sharding_key
);
/// Прислать запрос на перешардирование.
void
submitJob
(
const
ReshardingJob
&
job
);
const
ASTPtr
&
sharding_key_expr
);
/// Был ли поток запущен?
bool
isStarted
()
const
;
private:
/// Прислать запрос на перешардирование (внутренняя версия).
void
submitJobImpl
(
const
std
::
string
&
serialized_job
);
/// Следить за появлением новых задач. Выполнить их последовательно.
void
pollAndExecute
();
...
...
@@ -81,18 +78,17 @@ private:
/// Принудительно завершить поток.
void
abortIfRequested
()
const
;
/// Был ли поток завершён?
bool
hasAborted
(
const
Exception
&
ex
)
const
;
private:
Context
&
context
;
Logger
*
log
;
std
::
unique_ptr
<
MergeTreeDataMerger
>
merger
;
std
::
thread
polling_thread
;
mutable
std
::
mutex
cancel_mutex
;
std
::
string
host_task_queue_path
;
std
::
atomic
<
bool
>
is_started
{
false
};
std
::
atomic
<
bool
>
must_stop
{
false
};
};
using
ReshardingWorkerPtr
=
Poco
::
SharedP
tr
<
ReshardingWorker
>
;
using
ReshardingWorkerPtr
=
std
::
shared_p
tr
<
ReshardingWorker
>
;
}
dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h
浏览文件 @
d29ae515
...
...
@@ -2,6 +2,7 @@
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/IO/WriteBuffer.h>
#include <common/logger_useful.h>
namespace
DB
{
...
...
@@ -24,6 +25,7 @@ public:
private:
StorageReplicatedMergeTree
&
storage
;
Logger
*
log
;
};
/** Клиент для отправления кусков из партиции таблицы *MergeTree.
...
...
@@ -31,7 +33,7 @@ private:
class
Client
final
{
public:
Client
()
=
default
;
Client
();
Client
(
const
Client
&
)
=
delete
;
Client
&
operator
=
(
const
Client
&
)
=
delete
;
bool
send
(
const
InterserverIOEndpointLocation
&
to_location
,
const
InterserverIOEndpointLocation
&
from_location
,
...
...
@@ -40,6 +42,7 @@ public:
private:
std
::
atomic
<
bool
>
is_cancelled
{
false
};
Logger
*
log
;
};
}
...
...
dbms/include/DB/Storages/StorageDistributed.h
浏览文件 @
d29ae515
...
...
@@ -77,7 +77,7 @@ public:
void
shutdown
()
override
;
void
reshardPartitions
(
const
String
&
database_name
,
const
Field
&
first_partition
,
const
Field
&
last_partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
String
&
sharding_key
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
ASTPtr
&
sharding_key_expr
,
const
Settings
&
settings
)
override
;
/// От каждой реплики получить описание соответствующей локальной таблицы.
...
...
dbms/include/DB/Storages/StorageReplicatedMergeTree.h
浏览文件 @
d29ae515
...
...
@@ -136,7 +136,7 @@ public:
void
fetchPartition
(
const
Field
&
partition
,
const
String
&
from
,
const
Settings
&
settings
)
override
;
void
freezePartition
(
const
Field
&
partition
,
const
Settings
&
settings
)
override
;
void
reshardPartitions
(
const
String
&
database_name
,
const
Field
&
first_partition
,
const
Field
&
last_partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
String
&
sharding_key
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
ASTPtr
&
sharding_key_expr
,
const
Settings
&
settings
)
override
;
/** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper.
...
...
@@ -257,7 +257,7 @@ private:
MergeTreeDataMerger
merger
;
DataPartsExchange
::
Fetcher
fetcher
;
RemoteDiskSpaceMonitor
::
Client
free_disk_space_checker
;
RemoteDiskSpaceMonitor
::
Client
disk_space_monitor_client
;
ShardedPartitionSender
::
Client
sharded_partition_sender_client
;
RemoteQueryExecutor
::
Client
remote_query_executor_client
;
...
...
dbms/src/Core/ErrorCodes.cpp
浏览文件 @
d29ae515
...
...
@@ -317,13 +317,12 @@ namespace ErrorCodes
extern
const
int
INSUFFICIENT_SPACE_FOR_RESHARDING
=
311
;
extern
const
int
PARTITION_COPY_FAILED
=
312
;
extern
const
int
PARTITION_ATTACH_FAILED
=
313
;
extern
const
int
RESHARDING_CLEANUP_FAILED
=
314
;
extern
const
int
RESHARDING_NO_WORKER
=
315
;
extern
const
int
INVALID_PARTITIONS_INTERVAL
=
316
;
extern
const
int
RESHARDING_INVALID_PARAMETERS
=
317
;
extern
const
int
INVALID_SHARD_WEIGHT
=
318
;
extern
const
int
SHARD_DOESNT_REFERENCE_TABLE
=
319
;
extern
const
int
UNKNOWN_STATUS_OF_INSERT
=
320
;
extern
const
int
RESHARDING_NO_WORKER
=
314
;
extern
const
int
INVALID_PARTITIONS_INTERVAL
=
315
;
extern
const
int
RESHARDING_INVALID_PARAMETERS
=
316
;
extern
const
int
INVALID_SHARD_WEIGHT
=
317
;
extern
const
int
INVALID_CONFIG_PARAMETER
=
318
;
extern
const
int
UNKNOWN_STATUS_OF_INSERT
=
319
;
extern
const
int
KEEPER_EXCEPTION
=
999
;
extern
const
int
POCO_EXCEPTION
=
1000
;
...
...
dbms/src/Interpreters/Context.cpp
浏览文件 @
d29ae515
...
...
@@ -822,16 +822,19 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return
*
shared
->
background_pool
;
}
ReshardingWorker
&
Context
::
getReshardingWorker
(
)
void
Context
::
setReshardingWorker
(
std
::
shared_ptr
<
ReshardingWorker
>
resharding_worker
)
{
Poco
::
ScopedLock
<
Poco
::
Mutex
>
lock
(
shared
->
mutex
);
if
(
shared
->
resharding_worker
)
throw
Exception
(
"Resharding background thread has already been set."
,
ErrorCodes
::
LOGICAL_ERROR
);
shared
->
resharding_worker
=
resharding_worker
;
}
if
(
!
shared
->
zookeeper
)
throw
Exception
(
"Resharding background processing requires ZooKeeper"
,
ErrorCodes
::
LOGICAL_ERROR
);
ReshardingWorker
&
Context
::
getReshardingWorker
(
)
{
Poco
::
ScopedLock
<
Poco
::
Mutex
>
lock
(
shared
->
mutex
);
if
(
!
shared
->
resharding_worker
)
shared
->
resharding_worker
=
new
ReshardingWorker
(
*
this
);
throw
Exception
(
"Resharding background thread not set."
,
ErrorCodes
::
LOGICAL_ERROR
);
return
*
shared
->
resharding_worker
;
}
...
...
dbms/src/Interpreters/InterpreterAlterQuery.cpp
浏览文件 @
d29ae515
...
...
@@ -67,7 +67,7 @@ BlockIO InterpreterAlterQuery::execute()
break
;
case
PartitionCommand
::
RESHARD_PARTITION
:
table
->
reshardPartitions
(
database_name
,
command
.
partition
,
command
.
last_partition
,
command
.
weighted_zookeeper_paths
,
command
.
sharding_key
,
context
.
getSettingsRef
());
table
->
reshardPartitions
(
database_name
,
command
.
partition
,
command
.
last_partition
,
command
.
weighted_zookeeper_paths
,
command
.
sharding_key
_expr
,
context
.
getSettingsRef
());
break
;
default:
...
...
@@ -190,9 +190,8 @@ void InterpreterAlterQuery::parseAlter(
weighted_zookeeper_paths
.
emplace_back
(
weighted_zookeeper_path
.
path
,
weighted_zookeeper_path
.
weight
);
}
const
auto
&
sharding_key
=
params
.
sharding_key
;
out_partition_commands
.
push_back
(
PartitionCommand
::
reshardPartitions
(
first_partition
,
last_partition
,
weighted_zookeeper_paths
,
sharding_key
));
out_partition_commands
.
push_back
(
PartitionCommand
::
reshardPartitions
(
first_partition
,
last_partition
,
weighted_zookeeper_paths
,
params
.
sharding_key_expr
));
}
else
throw
Exception
(
"Wrong parameter type in ALTER query"
,
ErrorCodes
::
LOGICAL_ERROR
);
...
...
dbms/src/Parsers/ASTAlterQuery.cpp
浏览文件 @
d29ae515
...
...
@@ -19,6 +19,7 @@ void ASTAlterQuery::Parameters::clone(Parameters & p) const
if
(
partition
)
p
.
partition
=
partition
->
clone
();
if
(
last_partition
)
p
.
last_partition
=
last_partition
->
clone
();
if
(
weighted_zookeeper_paths
)
p
.
weighted_zookeeper_paths
=
weighted_zookeeper_paths
->
clone
();
if
(
sharding_key_expr
)
p
.
sharding_key_expr
=
sharding_key_expr
->
clone
();
}
void
ASTAlterQuery
::
addParameters
(
const
Parameters
&
params
)
...
...
@@ -34,6 +35,8 @@ void ASTAlterQuery::addParameters(const Parameters & params)
children
.
push_back
(
params
.
last_partition
);
if
(
params
.
weighted_zookeeper_paths
)
children
.
push_back
(
params
.
weighted_zookeeper_paths
);
if
(
params
.
sharding_key_expr
)
children
.
push_back
(
params
.
sharding_key_expr
);
}
ASTAlterQuery
::
ASTAlterQuery
(
StringRange
range_
)
:
IAST
(
range_
)
...
...
@@ -153,8 +156,9 @@ void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & st
settings
.
ostr
<<
settings
.
nl_or_ws
;
settings
.
ostr
<<
(
settings
.
hilite
?
hilite_keyword
:
""
)
<<
indent_str
<<
"USING "
<<
(
settings
.
hilite
?
hilite_none
:
""
)
<<
p
.
sharding_key
;
<<
"USING "
<<
(
settings
.
hilite
?
hilite_none
:
""
);
p
.
sharding_key_expr
->
formatImpl
(
settings
,
state
,
frame
);
}
else
throw
Exception
(
"Unexpected type of ALTER"
,
ErrorCodes
::
UNEXPECTED_AST_STRUCTURE
);
...
...
dbms/src/Parsers/ParserAlterQuery.cpp
浏览文件 @
d29ae515
...
...
@@ -255,7 +255,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
else
if
(
s_reshard
.
ignore
(
pos
,
end
,
max_parsed_pos
,
expected
))
{
ParserList
weighted_zookeeper_paths_p
(
ParserPtr
(
new
ParserWeightedZooKeeperPath
),
ParserPtr
(
new
ParserString
(
","
)),
false
);
Parser
Identifier
sharding_key_parser
;
Parser
ExpressionWithOptionalAlias
parser_sharding_key_expr
(
false
)
;
ws
.
ignore
(
pos
,
end
);
...
...
@@ -294,12 +294,9 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa
ws
.
ignore
(
pos
,
end
);
ASTPtr
ast_sharding_key
;
if
(
!
sharding_key_parser
.
parse
(
pos
,
end
,
ast_sharding_key
,
max_parsed_pos
,
expected
))
if
(
!
parser_sharding_key_expr
.
parse
(
pos
,
end
,
params
.
sharding_key_expr
,
max_parsed_pos
,
expected
))
return
false
;
params
.
sharding_key
=
typeid_cast
<
const
ASTIdentifier
&>
(
*
ast_sharding_key
).
name
;
ws
.
ignore
(
pos
,
end
);
params
.
type
=
ASTAlterQuery
::
RESHARD_PARTITION
;
...
...
dbms/src/Server/Server.cpp
浏览文件 @
d29ae515
...
...
@@ -320,14 +320,11 @@ int Server::main(const std::vector<std::string> & args)
global_context
->
setCurrentDatabase
(
config
().
getString
(
"default_database"
,
"default"
));
if
(
has_zookeeper
)
if
(
has_zookeeper
&&
config
().
has
(
"resharding"
)
)
{
zkutil
::
ZooKeeperPtr
zookeeper
=
global_context
->
getZooKeeper
();
if
(
!
zookeeper
->
getTaskQueuePath
().
empty
())
{
auto
&
resharding_worker
=
global_context
->
getReshardingWorker
();
resharding_worker
.
start
();
}
auto
resharding_worker
=
std
::
make_shared
<
ReshardingWorker
>
(
config
(),
"resharding"
,
*
global_context
);
global_context
->
setReshardingWorker
(
resharding_worker
);
resharding_worker
->
start
();
}
SCOPE_EXIT
(
...
...
dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp
0 → 100644
浏览文件 @
d29ae515
#include <DB/Storages/Distributed/DistributedBlockOutputStream.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Common/Increment.h>
#include <memory>
#include <common/Revision.h>
#include <iostream>
namespace
DB
{
namespace
{
template
<
typename
T
>
std
::
vector
<
IColumn
::
Filter
>
createFiltersImpl
(
const
size_t
num_rows
,
const
IColumn
*
column
,
const
Cluster
&
cluster
)
{
return
BlockFilterCreator
<
T
>::
perform
(
num_rows
,
column
,
cluster
.
getShardsInfo
().
size
(),
cluster
.
slot_to_shard
);
}
}
DistributedBlockOutputStream
::
DistributedBlockOutputStream
(
StorageDistributed
&
storage
,
const
ASTPtr
&
query_ast
)
:
storage
(
storage
),
query_ast
(
query_ast
)
{
}
void
DistributedBlockOutputStream
::
write
(
const
Block
&
block
)
{
if
(
storage
.
getShardingKeyExpr
()
&&
(
storage
.
cluster
.
getShardsInfo
().
size
()
>
1
))
return
writeSplit
(
block
);
writeImpl
(
block
);
}
std
::
vector
<
IColumn
::
Filter
>
DistributedBlockOutputStream
::
createFilters
(
Block
block
)
{
using
create_filters_sig
=
std
::
vector
<
IColumn
::
Filter
>
(
size_t
,
const
IColumn
*
,
const
Cluster
&
);
/// hashmap of pointers to functions corresponding to each integral type
static
std
::
unordered_map
<
std
::
string
,
create_filters_sig
*>
creators
{
{
TypeName
<
UInt8
>::
get
(),
&
createFiltersImpl
<
UInt8
>
},
{
TypeName
<
UInt16
>::
get
(),
&
createFiltersImpl
<
UInt16
>
},
{
TypeName
<
UInt32
>::
get
(),
&
createFiltersImpl
<
UInt32
>
},
{
TypeName
<
UInt64
>::
get
(),
&
createFiltersImpl
<
UInt64
>
},
{
TypeName
<
Int8
>::
get
(),
&
createFiltersImpl
<
Int8
>
},
{
TypeName
<
Int16
>::
get
(),
&
createFiltersImpl
<
Int16
>
},
{
TypeName
<
Int32
>::
get
(),
&
createFiltersImpl
<
Int32
>
},
{
TypeName
<
Int64
>::
get
(),
&
createFiltersImpl
<
Int64
>
},
};
storage
.
getShardingKeyExpr
()
->
execute
(
block
);
const
auto
&
key_column
=
block
.
getByName
(
storage
.
getShardingKeyColumnName
());
/// check that key column has valid type
const
auto
it
=
creators
.
find
(
key_column
.
type
->
getName
());
return
it
!=
std
::
end
(
creators
)
?
(
*
it
->
second
)(
block
.
rowsInFirstColumn
(),
key_column
.
column
.
get
(),
storage
.
cluster
)
:
throw
Exception
{
"Sharding key expression does not evaluate to an integer type"
,
ErrorCodes
::
TYPE_MISMATCH
};
}
void
DistributedBlockOutputStream
::
writeSplit
(
const
Block
&
block
)
{
const
auto
num_cols
=
block
.
columns
();
/// cache column pointers for later reuse
std
::
vector
<
const
IColumn
*>
columns
(
num_cols
);
for
(
size_t
i
=
0
;
i
<
columns
.
size
();
++
i
)
columns
[
i
]
=
block
.
getByPosition
(
i
).
column
;
auto
filters
=
createFilters
(
block
);
const
auto
num_shards
=
storage
.
cluster
.
getShardsInfo
().
size
();
ssize_t
size_hint
=
((
block
.
rowsInFirstColumn
()
+
num_shards
-
1
)
/
num_shards
)
*
1.1
;
/// Число 1.1 выбрано наугад.
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
{
auto
target_block
=
block
.
cloneEmpty
();
for
(
size_t
col
=
0
;
col
<
num_cols
;
++
col
)
target_block
.
getByPosition
(
col
).
column
=
columns
[
col
]
->
filter
(
filters
[
i
],
size_hint
);
if
(
target_block
.
rowsInFirstColumn
())
writeImpl
(
target_block
,
i
);
}
}
void
DistributedBlockOutputStream
::
writeImpl
(
const
Block
&
block
,
const
size_t
shard_id
)
{
const
auto
&
shard_info
=
storage
.
cluster
.
getShardsInfo
()[
shard_id
];
if
(
shard_info
.
getLocalNodeCount
()
>
0
)
writeToLocal
(
block
,
shard_info
.
getLocalNodeCount
());
/// dir_names is empty if shard has only local addresses
if
(
!
shard_info
.
dir_names
.
empty
())
writeToShard
(
block
,
shard_info
.
dir_names
);
}
void
DistributedBlockOutputStream
::
writeToLocal
(
const
Block
&
block
,
const
size_t
repeats
)
{
InterpreterInsertQuery
interp
{
query_ast
,
storage
.
context
};
auto
block_io
=
interp
.
execute
();
block_io
.
out
->
writePrefix
();
for
(
size_t
i
=
0
;
i
<
repeats
;
++
i
)
block_io
.
out
->
write
(
block
);
block_io
.
out
->
writeSuffix
();
}
void
DistributedBlockOutputStream
::
writeToShard
(
const
Block
&
block
,
const
std
::
vector
<
std
::
string
>
&
dir_names
)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std
::
string
first_file_tmp_path
{};
auto
first
=
true
;
const
auto
&
query_string
=
queryToString
(
query_ast
);
/// write first file, hardlink the others
for
(
const
auto
&
dir_name
:
dir_names
)
{
const
auto
&
path
=
storage
.
getPath
()
+
dir_name
+
'/'
;
/// ensure shard subdirectory creation and notify storage
if
(
Poco
::
File
(
path
).
createDirectory
())
storage
.
requireDirectoryMonitor
(
dir_name
);
const
auto
&
file_name
=
toString
(
Increment
{
path
+
"increment.txt"
}.
get
(
true
))
+
".bin"
;
const
auto
&
block_file_path
=
path
+
file_name
;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
* the inode is not freed until we're done */
if
(
first
)
{
first
=
false
;
const
auto
&
tmp_path
=
path
+
"tmp/"
;
Poco
::
File
(
tmp_path
).
createDirectory
();
const
auto
&
block_file_tmp_path
=
tmp_path
+
file_name
;
first_file_tmp_path
=
block_file_tmp_path
;
WriteBufferFromFile
out
{
block_file_tmp_path
};
CompressedWriteBuffer
compress
{
out
};
NativeBlockOutputStream
stream
{
compress
,
Revision
::
get
()};
writeStringBinary
(
query_string
,
out
);
stream
.
writePrefix
();
stream
.
write
(
block
);
stream
.
writeSuffix
();
}
if
(
link
(
first_file_tmp_path
.
data
(),
block_file_path
.
data
()))
throwFromErrno
(
"Could not link "
+
block_file_path
+
" to "
+
first_file_tmp_path
);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads
* have removed their corresponding files */
Poco
::
File
(
first_file_tmp_path
).
remove
();
}
}
dbms/src/Storages/MergeTree/MergeTreeSharder.cpp
浏览文件 @
d29ae515
...
...
@@ -4,6 +4,8 @@
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <ctime>
...
...
@@ -16,47 +18,6 @@ namespace ErrorCodes
extern
const
int
TYPE_MISMATCH
;
}
namespace
{
template
<
typename
T
>
std
::
vector
<
IColumn
::
Filter
>
createFiltersImpl
(
const
size_t
num_rows
,
const
IColumn
*
column
,
size_t
num_shards
,
const
std
::
vector
<
size_t
>
&
slots
)
{
const
auto
total_weight
=
slots
.
size
();
std
::
vector
<
IColumn
::
Filter
>
filters
(
num_shards
);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using
UnsignedT
=
typename
std
::
make_unsigned
<
T
>::
type
;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if
(
column
->
isConst
())
{
const
auto
data
=
typeid_cast
<
const
ColumnConst
<
T
>
*>
(
column
)
->
getData
();
const
auto
shard_num
=
slots
[
static_cast
<
UnsignedT
>
(
data
)
%
total_weight
];
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
filters
[
i
].
assign
(
num_rows
,
static_cast
<
UInt8
>
(
shard_num
==
i
));
}
else
{
const
auto
&
data
=
typeid_cast
<
const
ColumnVector
<
T
>
*>
(
column
)
->
getData
();
for
(
size_t
i
=
0
;
i
<
num_shards
;
++
i
)
{
filters
[
i
].
resize
(
num_rows
);
for
(
size_t
j
=
0
;
j
<
num_rows
;
++
j
)
filters
[
i
][
j
]
=
slots
[
static_cast
<
UnsignedT
>
(
data
[
j
])
%
total_weight
]
==
i
;
}
}
return
filters
;
}
}
ShardedBlockWithDateInterval
::
ShardedBlockWithDateInterval
(
const
Block
&
block_
,
size_t
shard_no_
,
UInt16
min_date_
,
UInt16
max_date_
)
:
block
(
block_
),
shard_no
(
shard_no_
),
min_date
(
min_date_
),
max_date
(
max_date_
)
...
...
@@ -64,7 +25,9 @@ ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_,
}
MergeTreeSharder
::
MergeTreeSharder
(
MergeTreeData
&
data_
,
const
ReshardingJob
&
job_
)
:
data
(
data_
),
job
(
job_
),
log
(
&
Logger
::
get
(
data
.
getLogName
()
+
" (Sharder)"
))
:
data
(
data_
),
job
(
job_
),
log
(
&
Logger
::
get
(
data
.
getLogName
()
+
" (Sharder)"
)),
sharding_key_expr
(
ExpressionAnalyzer
(
job
.
sharding_key_expr
,
data
.
context
,
nullptr
,
data
.
getColumnsList
()).
getActions
(
false
)),
sharding_key_column_name
(
job
.
sharding_key_expr
->
getColumnName
())
{
for
(
size_t
shard_no
=
0
;
shard_no
<
job
.
paths
.
size
();
++
shard_no
)
{
...
...
@@ -201,19 +164,19 @@ std::vector<IColumn::Filter> MergeTreeSharder::createFilters(Block block)
using
create_filters_sig
=
std
::
vector
<
IColumn
::
Filter
>
(
size_t
,
const
IColumn
*
,
size_t
num_shards
,
const
std
::
vector
<
size_t
>
&
slots
);
/// hashmap of pointers to functions corresponding to each integral type
static
std
::
unordered_map
<
std
::
string
,
create_filters_sig
*>
creators
{
{
TypeName
<
UInt8
>::
get
(),
&
createFiltersImpl
<
UInt8
>
},
{
TypeName
<
UInt16
>::
get
(),
&
createFiltersImpl
<
UInt16
>
},
{
TypeName
<
UInt32
>::
get
(),
&
createFiltersImpl
<
UInt32
>
},
{
TypeName
<
UInt64
>::
get
(),
&
createFiltersImpl
<
UInt64
>
},
{
TypeName
<
Int8
>::
get
(),
&
createFiltersImpl
<
Int8
>
},
{
TypeName
<
Int16
>::
get
(),
&
createFiltersImpl
<
Int16
>
},
{
TypeName
<
Int32
>::
get
(),
&
createFiltersImpl
<
Int32
>
},
{
TypeName
<
Int64
>::
get
(),
&
createFiltersImpl
<
Int64
>
},
{
TypeName
<
UInt8
>::
get
(),
&
BlockFilterCreator
<
UInt8
>::
perform
},
{
TypeName
<
UInt16
>::
get
(),
&
BlockFilterCreator
<
UInt16
>::
perform
},
{
TypeName
<
UInt32
>::
get
(),
&
BlockFilterCreator
<
UInt32
>::
perform
},
{
TypeName
<
UInt64
>::
get
(),
&
BlockFilterCreator
<
UInt64
>::
perform
},
{
TypeName
<
Int8
>::
get
(),
&
BlockFilterCreator
<
Int8
>::
perform
},
{
TypeName
<
Int16
>::
get
(),
&
BlockFilterCreator
<
Int16
>::
perform
},
{
TypeName
<
Int32
>::
get
(),
&
BlockFilterCreator
<
Int32
>::
perform
},
{
TypeName
<
Int64
>::
get
(),
&
BlockFilterCreator
<
Int64
>::
perform
},
};
data
.
getPrimaryExpression
()
->
execute
(
block
);
sharding_key_expr
->
execute
(
block
);
const
auto
&
key_column
=
block
.
getByName
(
job
.
sharding_key
);
const
auto
&
key_column
=
block
.
getByName
(
sharding_key_column_name
);
/// check that key column has valid type
const
auto
it
=
creators
.
find
(
key_column
.
type
->
getName
());
...
...
dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp
浏览文件 @
d29ae515
#include <DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Interpreters/Context.h>
#include <DB/IO/ReadBufferFromHTTP.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
...
...
@@ -25,8 +26,8 @@ std::string getEndpointId(const std::string & node_id)
}
Service
::
Service
(
const
String
&
path
_
)
:
path
(
path
_
)
Service
::
Service
(
const
Context
&
context
_
)
:
context
(
context
_
)
{
}
...
...
@@ -40,12 +41,12 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
if
(
is_cancelled
)
throw
Exception
(
"RemoteDiskSpaceMonitor service terminated"
,
ErrorCodes
::
ABORTED
);
size_t
free_space
=
DiskSpaceMonitor
::
getUnreservedFreeSpace
(
path
);
size_t
free_space
=
DiskSpaceMonitor
::
getUnreservedFreeSpace
(
context
.
getPath
()
);
writeBinary
(
free_space
,
out
);
out
.
next
();
}
size_t
Client
::
getFree
Disk
Space
(
const
InterserverIOEndpointLocation
&
location
)
const
size_t
Client
::
getFreeSpace
(
const
InterserverIOEndpointLocation
&
location
)
const
{
ReadBufferFromHTTP
::
Params
params
=
{
...
...
dbms/src/Storages/MergeTree/ReshardingJob.cpp
浏览文件 @
d29ae515
...
...
@@ -3,10 +3,17 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Parsers/ParserQuery.h>
#include <DB/Parsers/ExpressionListParsers.h>
namespace
DB
{
namespace
ErrorCodes
{
extern
const
int
LOGICAL_ERROR
;
}
ReshardingJob
::
ReshardingJob
(
const
std
::
string
&
serialized_job
)
{
ReadBufferFromString
buf
(
serialized_job
);
...
...
@@ -14,7 +21,19 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
readBinary
(
database_name
,
buf
);
readBinary
(
table_name
,
buf
);
readBinary
(
partition
,
buf
);
readBinary
(
sharding_key
,
buf
);
std
::
string
expr
;
readBinary
(
expr
,
buf
);
IParser
::
Pos
pos
=
expr
.
data
();
IParser
::
Pos
max_parsed_pos
=
pos
;
const
char
*
end
=
pos
+
expr
.
size
();
ParserExpressionWithOptionalAlias
parser
(
false
);
Expected
expected
=
""
;
if
(
!
parser
.
parse
(
pos
,
end
,
sharding_key_expr
,
max_parsed_pos
,
expected
))
throw
Exception
(
"ReshardingJob: Internal error"
,
ErrorCodes
::
LOGICAL_ERROR
);
while
(
!
buf
.
eof
())
{
std
::
string
path
;
...
...
@@ -29,12 +48,12 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job)
ReshardingJob
::
ReshardingJob
(
const
std
::
string
&
database_name_
,
const
std
::
string
&
table_name_
,
const
std
::
string
&
partition_
,
const
WeightedZooKeeperPaths
&
paths_
,
const
std
::
string
&
sharding_key
_
)
const
ASTPtr
&
sharding_key_expr
_
)
:
database_name
(
database_name_
),
table_name
(
table_name_
),
partition
(
partition_
),
paths
(
paths_
),
sharding_key
(
sharding_key
_
)
sharding_key
_expr
(
sharding_key_expr
_
)
{
}
...
...
@@ -46,7 +65,8 @@ std::string ReshardingJob::toString() const
writeBinary
(
database_name
,
buf
);
writeBinary
(
table_name
,
buf
);
writeBinary
(
partition
,
buf
);
writeBinary
(
sharding_key
,
buf
);
writeBinary
(
queryToString
(
sharding_key_expr
),
buf
);
for
(
const
auto
&
path
:
paths
)
{
writeBinary
(
path
.
first
,
buf
);
...
...
dbms/src/Storages/MergeTree/ReshardingWorker.cpp
浏览文件 @
d29ae515
...
...
@@ -11,6 +11,7 @@
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Common/Increment.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Context.h>
#include <common/threadpool.hpp>
...
...
@@ -30,7 +31,8 @@ namespace ErrorCodes
extern
const
int
UNEXPECTED_ZOOKEEPER_ERROR
;
extern
const
int
PARTITION_COPY_FAILED
;
extern
const
int
PARTITION_ATTACH_FAILED
;
extern
const
int
RESHARDING_CLEANUP_FAILED
;
extern
const
int
UNKNOWN_ELEMENT_IN_CONFIG
;
extern
const
int
INVALID_CONFIG_PARAMETER
;
}
namespace
...
...
@@ -52,17 +54,52 @@ std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts)
return
ActiveDataPartSet
::
getPartName
(
left_date
,
right_date
,
parts
.
front
()
->
left
,
parts
.
back
()
->
right
,
level
+
1
);
}
class
Arguments
final
{
public:
Arguments
(
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
config_name
)
{
Poco
::
Util
::
AbstractConfiguration
::
Keys
keys
;
config
.
keys
(
config_name
,
keys
);
for
(
const
auto
&
key
:
keys
)
{
if
(
key
==
"task_queue_path"
)
{
task_queue_path
=
config
.
getString
(
config_name
+
"."
+
key
);
if
(
task_queue_path
.
empty
())
throw
Exception
(
"Invalid parameter in resharding configuration"
,
ErrorCodes
::
INVALID_CONFIG_PARAMETER
);
}
else
throw
Exception
(
"Unknown parameter in resharding configuration"
,
ErrorCodes
::
UNKNOWN_ELEMENT_IN_CONFIG
);
}
}
Arguments
(
const
Arguments
&
)
=
delete
;
Arguments
&
operator
=
(
const
Arguments
&
)
=
delete
;
std
::
string
getTaskQueuePath
()
const
{
return
task_queue_path
;
}
private:
std
::
string
task_queue_path
;
};
}
ReshardingWorker
::
ReshardingWorker
(
Context
&
context_
)
ReshardingWorker
::
ReshardingWorker
(
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
config_name
,
Context
&
context_
)
:
context
(
context_
),
log
(
&
Logger
::
get
(
"ReshardingWorker"
))
{
Arguments
arguments
(
config
,
config_name
);
auto
zookeeper
=
context
.
getZooKeeper
();
host_task_queue_path
=
"/clickhouse"
;
zookeeper
->
createIfNotExists
(
host_task_queue_path
,
""
);
host_task_queue_path
+=
"/"
+
zookeeper
->
getTaskQueuePath
();
host_task_queue_path
+=
"/"
+
arguments
.
getTaskQueuePath
();
zookeeper
->
createIfNotExists
(
host_task_queue_path
,
""
);
host_task_queue_path
+=
"/resharding"
;
...
...
@@ -75,6 +112,11 @@ ReshardingWorker::ReshardingWorker(Context & context_)
ReshardingWorker
::~
ReshardingWorker
()
{
must_stop
=
true
;
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
cancel_mutex
);
if
(
merger
)
merger
->
cancel
();
}
if
(
polling_thread
.
joinable
())
polling_thread
.
join
();
}
...
...
@@ -88,16 +130,12 @@ void ReshardingWorker::submitJob(const std::string & database_name,
const
std
::
string
&
table_name
,
const
std
::
string
&
partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
std
::
string
&
sharding_key
)
const
ASTPtr
&
sharding_key_expr
)
{
auto
str
=
ReshardingJob
(
database_name
,
table_name
,
partition
,
weighted_zookeeper_paths
,
sharding_key
).
toString
();
submitJobImpl
(
str
);
}
void
ReshardingWorker
::
submitJob
(
const
ReshardingJob
&
job
)
{
auto
str
=
job
.
toString
();
submitJobImpl
(
str
);
auto
serialized_job
=
ReshardingJob
(
database_name
,
table_name
,
partition
,
weighted_zookeeper_paths
,
sharding_key_expr
).
toString
();
auto
zookeeper
=
context
.
getZooKeeper
();
(
void
)
zookeeper
->
create
(
host_task_queue_path
+
"/task-"
,
serialized_job
,
zkutil
::
CreateMode
::
PersistentSequential
);
}
bool
ReshardingWorker
::
isStarted
()
const
...
...
@@ -105,23 +143,18 @@ bool ReshardingWorker::isStarted() const
return
is_started
;
}
void
ReshardingWorker
::
submitJobImpl
(
const
std
::
string
&
serialized_job
)
{
auto
zookeeper
=
context
.
getZooKeeper
();
(
void
)
zookeeper
->
create
(
host_task_queue_path
+
"/task-"
,
serialized_job
,
zkutil
::
CreateMode
::
PersistentSequential
);
}
void
ReshardingWorker
::
pollAndExecute
()
{
bool
error
=
false
;
try
{
bool
old_val
=
false
;
if
(
!
is_started
.
compare_exchange_strong
(
old_val
,
true
,
std
::
memory_order_seq_cst
,
std
::
memory_order_relaxed
))
throw
Exception
(
"Resharding
worker
thread already started"
,
ErrorCodes
::
LOGICAL_ERROR
);
throw
Exception
(
"Resharding
background
thread already started"
,
ErrorCodes
::
LOGICAL_ERROR
);
LOG_DEBUG
(
log
,
"Started resharding thread."
);
LOG_DEBUG
(
log
,
"Started resharding
background
thread."
);
try
{
...
...
@@ -129,10 +162,10 @@ void ReshardingWorker::pollAndExecute()
}
catch
(
const
Exception
&
ex
)
{
if
(
(
ex
.
code
()
==
ErrorCodes
::
RESHARDING_CLEANUP_FAILED
)
||
hasAborted
(
ex
)
)
if
(
ex
.
code
()
==
ErrorCodes
::
ABORTED
)
throw
;
else
LOG_
INFO
(
log
,
ex
.
message
());
LOG_
ERROR
(
log
,
ex
.
message
());
}
catch
(...)
{
...
...
@@ -166,10 +199,10 @@ void ReshardingWorker::pollAndExecute()
}
catch
(
const
Exception
&
ex
)
{
if
(
(
ex
.
code
()
==
ErrorCodes
::
RESHARDING_CLEANUP_FAILED
)
||
hasAborted
(
ex
)
)
if
(
ex
.
code
()
==
ErrorCodes
::
ABORTED
)
throw
;
else
LOG_
INFO
(
log
,
ex
.
message
());
LOG_
ERROR
(
log
,
ex
.
message
());
}
catch
(...)
{
...
...
@@ -179,11 +212,21 @@ void ReshardingWorker::pollAndExecute()
}
catch
(
const
Exception
&
ex
)
{
if
(
!
hasAborted
(
ex
))
throw
;
if
(
ex
.
code
()
!=
ErrorCodes
::
ABORTED
)
error
=
true
;
}
catch
(...)
{
error
=
true
;
}
LOG_DEBUG
(
log
,
"Resharding thread terminated."
);
if
(
error
)
{
/// Если мы попали сюда, это значит, что где-то кроется баг.
LOG_ERROR
(
log
,
"Resharding background thread terminated with critical error."
);
}
else
LOG_DEBUG
(
log
,
"Resharding background thread terminated."
);
}
void
ReshardingWorker
::
performPendingJobs
()
...
...
@@ -204,8 +247,24 @@ void ReshardingWorker::perform(const Strings & job_nodes)
std
::
string
child_full_path
=
host_task_queue_path
+
"/"
+
child
;
auto
job_descriptor
=
zookeeper
->
get
(
child_full_path
);
ReshardingJob
job
(
job_descriptor
);
try
{
perform
(
job
);
}
catch
(
const
Exception
&
ex
)
{
if
(
ex
.
code
()
!=
ErrorCodes
::
ABORTED
)
zookeeper
->
remove
(
child_full_path
);
throw
;
}
catch
(...)
{
zookeeper
->
remove
(
child_full_path
);
throw
;
}
zookeeper
->
remove
(
child_full_path
);
perform
(
job
);
}
}
...
...
@@ -229,12 +288,8 @@ void ReshardingWorker::perform(const ReshardingJob & job)
{
cleanup
(
storage
,
job
);
if
(
hasAborted
(
ex
))
{
/// Поток завершается. Сохраняем сведения о прерванной задаче.
submitJob
(
job
);
LOG_DEBUG
(
log
,
"Resharding job cancelled then re-submitted for later processing."
);
}
if
(
ex
.
code
()
==
ErrorCodes
::
ABORTED
)
LOG_DEBUG
(
log
,
"Resharding job cancelled."
);
throw
;
}
...
...
@@ -276,15 +331,24 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
/// Для каждого шарда, куски, которые должны быть слиты.
std
::
unordered_map
<
size_t
,
PartsToBeMerged
>
to_merge
;
/// Для нумерации блоков.
SimpleIncrement
increment
(
storage
.
data
.
getMaxDataPartIndex
());
MergeTreeData
::
PerShardDataParts
&
per_shard_data_parts
=
storage
.
data
.
per_shard_data_parts
;
auto
zookeeper
=
storage
.
getZooKeeper
();
const
auto
&
settings
=
context
.
getSettingsRef
();
(
void
)
settings
;
DayNum_t
month
=
MergeTreeData
::
getMonthFromName
(
job
.
partition
);
auto
parts_from_partition
=
storage
.
merger
.
selectAllPartsFromPartition
(
month
);
{
std
::
lock_guard
<
std
::
mutex
>
guard
(
cancel_mutex
);
merger
=
std
::
make_unique
<
MergeTreeDataMerger
>
(
storage
.
data
);
}
auto
parts_from_partition
=
merger
->
selectAllPartsFromPartition
(
month
);
MergeTreeSharder
sharder
(
storage
.
data
,
job
);
for
(
const
auto
&
part
:
parts_from_partition
)
{
...
...
@@ -305,8 +369,6 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
DBMS_DEFAULT_BUFFER_SIZE
,
true
);
MergeTreeSharder
sharder
(
storage
.
data
,
job
);
Block
block
;
while
(
block
=
source
.
read
())
{
...
...
@@ -318,57 +380,8 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
abortIfRequested
();
/// Создать новый кусок соответствующий новому блоку.
std
::
string
month_name
=
toString
(
DateLUT
::
instance
().
toNumYYYYMMDD
(
DayNum_t
(
block_with_dates
.
min_date
))
/
100
);
AbandonableLockInZooKeeper
block_number_lock
=
storage
.
allocateBlockNumber
(
month_name
);
Int64
part_number
=
block_number_lock
.
getNumber
();
MergeTreeData
::
MutableDataPartPtr
block_part
=
sharder
.
writeTempPart
(
block_with_dates
,
part_number
);
/// Добавить в БД ZooKeeper информацию о новом блоке.
SipHash
hash
;
block_part
->
checksums
.
summaryDataChecksum
(
hash
);
union
{
char
bytes
[
16
];
UInt64
lo
;
UInt64
hi
;
}
hash_value
;
hash
.
get128
(
hash_value
.
bytes
);
std
::
string
checksum
(
hash_value
.
bytes
,
16
);
std
::
string
block_id
=
toString
(
hash_value
.
lo
)
+
"_"
+
toString
(
hash_value
.
hi
);
zkutil
::
Ops
ops
;
auto
acl
=
zookeeper
->
getDefaultACL
();
std
::
string
to_path
=
job
.
paths
[
block_with_dates
.
shard_no
].
first
;
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
to_path
+
"/detached_sharded_blocks/"
+
block_id
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
to_path
+
"/detached_sharded_blocks/"
+
block_id
+
"/checksum"
,
checksum
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
to_path
+
"/detached_sharded_blocks/"
+
block_id
+
"/number"
,
toString
(
part_number
),
acl
,
zkutil
::
CreateMode
::
Persistent
));
block_number_lock
.
getUnlockOps
(
ops
);
auto
code
=
zookeeper
->
tryMulti
(
ops
);
if
(
code
!=
ZOK
)
throw
Exception
(
"Unexpected error while adding block "
+
toString
(
part_number
)
+
" with ID "
+
block_id
+
": "
+
zkutil
::
ZooKeeper
::
error2string
(
code
),
ErrorCodes
::
UNEXPECTED_ZOOKEEPER_ERROR
);
Int64
temp_index
=
increment
.
get
();
MergeTreeData
::
MutableDataPartPtr
block_part
=
sharder
.
writeTempPart
(
block_with_dates
,
temp_index
);
abortIfRequested
();
...
...
@@ -390,7 +403,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
const
auto
&
merge_entry
=
storage
.
data
.
context
.
getMergeList
().
insert
(
job
.
database_name
,
job
.
table_name
,
merged_name
);
MergeTreeData
::
MutableDataPartPtr
new_part
=
storage
.
merger
.
mergeParts
(
parts
,
merged_name
,
*
merge_entry
,
MergeTreeData
::
MutableDataPartPtr
new_part
=
merger
->
mergeParts
(
parts
,
merged_name
,
*
merge_entry
,
storage
.
data
.
context
.
getSettings
().
min_bytes_to_use_direct_io
);
sharded_parts
.
insert
(
new_part
);
...
...
@@ -424,7 +437,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor
const
auto
&
merge_entry
=
storage
.
data
.
context
.
getMergeList
().
insert
(
job
.
database_name
,
job
.
table_name
,
merged_name
);
MergeTreeData
::
MutableDataPartPtr
new_part
=
storage
.
merger
.
mergeParts
(
parts
,
merged_name
,
*
merge_entry
,
MergeTreeData
::
MutableDataPartPtr
new_part
=
merger
->
mergeParts
(
parts
,
merged_name
,
*
merge_entry
,
storage
.
data
.
context
.
getSettings
().
min_bytes_to_use_direct_io
);
sharded_parts
.
insert
(
new_part
);
...
...
@@ -466,8 +479,6 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
auto
zookeeper
=
storage
.
getZooKeeper
();
MergeTreeData
::
PerShardDataParts
&
per_shard_data_parts
=
storage
.
data
.
per_shard_data_parts
;
struct
TaskInfo
{
TaskInfo
(
const
std
::
string
&
replica_path_
,
...
...
@@ -493,16 +504,20 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto
/// Количество участвующих локальных реплик. Должно быть <= 1.
size_t
local_count
=
0
;
for
(
size_t
shard_no
=
0
;
shard_no
<
job
.
paths
.
size
();
++
shard_no
)
for
(
const
auto
&
entry
:
storage
.
data
.
per_shard_data_parts
)
{
const
WeightedZooKeeperPath
&
weighted_path
=
job
.
paths
[
shard_no
];
const
std
::
string
&
zookeeper_path
=
weighted_path
.
first
;
size_t
shard_no
=
entry
.
first
;
const
MergeTreeData
::
MutableDataParts
&
sharded_parts
=
entry
.
second
;
if
(
sharded_parts
.
empty
())
continue
;
std
::
vector
<
std
::
string
>
part_names
;
const
MergeTreeData
::
MutableDataParts
&
sharded_parts
=
per_shard_data_parts
.
at
(
shard_no
);
for
(
const
MergeTreeData
::
DataPartPtr
&
sharded_part
:
sharded_parts
)
part_names
.
push_back
(
sharded_part
->
name
);
const
WeightedZooKeeperPath
&
weighted_path
=
job
.
paths
[
shard_no
];
const
std
::
string
&
zookeeper_path
=
weighted_path
.
first
;
auto
children
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/replicas"
);
for
(
const
auto
&
child
:
children
)
{
...
...
@@ -610,9 +625,14 @@ void ReshardingWorker::applyChanges(StorageReplicatedMergeTree & storage, const
using
TaskInfoList
=
std
::
vector
<
TaskInfo
>
;
TaskInfoList
task_info_list
;
for
(
size_t
i
=
0
;
i
<
job
.
paths
.
size
();
++
i
)
for
(
const
auto
&
entry
:
storage
.
data
.
per_shard_data_parts
)
{
const
WeightedZooKeeperPath
&
weighted_path
=
job
.
paths
[
i
];
size_t
shard_no
=
entry
.
first
;
const
MergeTreeData
::
MutableDataParts
&
sharded_parts
=
entry
.
second
;
if
(
sharded_parts
.
empty
())
continue
;
const
WeightedZooKeeperPath
&
weighted_path
=
job
.
paths
[
shard_no
];
const
std
::
string
&
zookeeper_path
=
weighted_path
.
first
;
auto
children
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/replicas"
);
...
...
@@ -668,47 +688,13 @@ void ReshardingWorker::cleanup(StorageReplicatedMergeTree & storage, const Resha
{
LOG_DEBUG
(
log
,
"Performing cleanup."
);
try
{
storage
.
data
.
per_shard_data_parts
.
clear
();
Poco
::
DirectoryIterator
end
;
for
(
Poco
::
DirectoryIterator
it
(
storage
.
full_path
+
"/reshard"
);
it
!=
end
;
++
it
)
{
auto
absolute_path
=
it
.
path
().
absolute
().
toString
();
Poco
::
File
(
absolute_path
).
remove
(
true
);
}
auto
zookeeper
=
storage
.
getZooKeeper
();
zkutil
::
Ops
ops
;
for
(
size_t
i
=
0
;
i
<
job
.
paths
.
size
();
++
i
)
{
const
WeightedZooKeeperPath
&
weighted_path
=
job
.
paths
[
i
];
const
std
::
string
&
zookeeper_path
=
weighted_path
.
first
;
storage
.
data
.
per_shard_data_parts
.
clear
();
auto
children
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/detached_sharded_blocks"
);
if
(
!
children
.
empty
())
{
for
(
const
auto
&
child
:
children
)
{
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/number"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/checksum"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
,
-
1
));
}
}
}
zookeeper
->
multi
(
ops
);
}
catch
(...)
Poco
::
DirectoryIterator
end
;
for
(
Poco
::
DirectoryIterator
it
(
storage
.
full_path
+
"/reshard"
);
it
!=
end
;
++
it
)
{
throw
Exception
(
"Failed to perform cleanup during resharding operation"
,
ErrorCodes
::
RESHARDING_CLEANUP_FAILED
);
auto
absolute_path
=
it
.
path
().
absolute
().
toString
();
Poco
::
File
(
absolute_path
).
remove
(
true
);
}
}
...
...
@@ -718,9 +704,4 @@ void ReshardingWorker::abortIfRequested() const
throw
Exception
(
"Cancelled resharding"
,
ErrorCodes
::
ABORTED
);
}
bool
ReshardingWorker
::
hasAborted
(
const
Exception
&
ex
)
const
{
return
must_stop
&&
(
ex
.
code
()
==
ErrorCodes
::
ABORTED
);
}
}
dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp
浏览文件 @
d29ae515
...
...
@@ -51,7 +51,7 @@ std::string getEndpointId(const std::string & node_id)
}
Service
::
Service
(
StorageReplicatedMergeTree
&
storage_
)
:
storage
(
storage_
)
:
storage
(
storage_
)
,
log
(
&
Logger
::
get
(
"ShardedPartitionSender::Service"
))
{
}
...
...
@@ -79,8 +79,18 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
MergeTreeData
::
MutableDataPartPtr
part
=
storage
.
fetcher
.
fetchShardedPart
(
from_location
,
part_name
,
shard_no
);
part
->
is_temp
=
false
;
const
std
::
string
new_name
=
"detached/"
+
part_name
;
Poco
::
File
(
storage
.
full_path
+
part
->
name
).
renameTo
(
storage
.
full_path
+
new_name
);
const
std
::
string
old_part_path
=
storage
.
full_path
+
part
->
name
;
const
std
::
string
new_part_path
=
storage
.
full_path
+
"detached/"
+
part_name
;
Poco
::
File
new_part_dir
(
new_part_path
);
if
(
new_part_dir
.
exists
())
{
LOG_WARNING
(
log
,
"Directory "
+
new_part_path
+
" already exists. Removing."
);
new_part_dir
.
remove
(
true
);
}
Poco
::
File
(
old_part_path
).
renameTo
(
new_part_path
);
}
bool
flag
=
true
;
...
...
@@ -88,6 +98,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out
out
.
next
();
}
Client
::
Client
()
:
log
(
&
Logger
::
get
(
"ShardedPartitionSender::Client"
))
{
}
bool
Client
::
send
(
const
InterserverIOEndpointLocation
&
to_location
,
const
InterserverIOEndpointLocation
&
from_location
,
const
std
::
vector
<
std
::
string
>
&
parts
,
size_t
shard_no
)
{
...
...
dbms/src/Storages/StorageDistributed.cpp
浏览文件 @
d29ae515
...
...
@@ -228,7 +228,7 @@ void StorageDistributed::shutdown()
void
StorageDistributed
::
reshardPartitions
(
const
String
&
database_name
,
const
Field
&
first_partition
,
const
Field
&
last_partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
String
&
sharding_key
,
const
Settings
&
settings
)
const
ASTPtr
&
sharding_key_expr
,
const
Settings
&
settings
)
{
/// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu.
...
...
@@ -258,7 +258,7 @@ void StorageDistributed::reshardPartitions(const String & database_name, const F
}
parameters
.
weighted_zookeeper_paths
=
expr_list
;
parameters
.
sharding_key
=
sharding_key
;
parameters
.
sharding_key
_expr
=
sharding_key_expr
;
/** Функциональность shard_multiplexing не доделана - выключаем её.
* (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.)
...
...
dbms/src/Storages/StorageReplicatedMergeTree.cpp
浏览文件 @
d29ae515
...
...
@@ -75,7 +75,6 @@ namespace ErrorCodes
extern
const
int
INVALID_PARTITIONS_INTERVAL
;
extern
const
int
RESHARDING_INVALID_PARAMETERS
;
extern
const
int
INVALID_SHARD_WEIGHT
;
extern
const
int
SHARD_DOESNT_REFERENCE_TABLE
;
}
...
...
@@ -335,7 +334,7 @@ StoragePtr StorageReplicatedMergeTree::create(
/// Сервисы для перешардирования.
{
InterserverIOEndpointPtr
endpoint
=
new
RemoteDiskSpaceMonitor
::
Service
(
res
->
full_path
);
InterserverIOEndpointPtr
endpoint
=
new
RemoteDiskSpaceMonitor
::
Service
(
res
->
context
);
res
->
disk_space_monitor_endpoint_holder
=
get_endpoint_holder
(
endpoint
);
}
...
...
@@ -403,8 +402,6 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/blocks"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/detached_sharded_blocks"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/block_numbers"
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/nonincrement_block_numbers"
,
""
,
...
...
@@ -2317,7 +2314,7 @@ void StorageReplicatedMergeTree::shutdown()
fetcher
.
cancel
();
disk_space_monitor_endpoint_holder
=
nullptr
;
free_disk_space_checker
.
cancel
();
disk_space_monitor_client
.
cancel
();
sharded_partition_sender_endpoint_holder
=
nullptr
;
sharded_partition_sender_client
.
cancel
();
...
...
@@ -2891,56 +2888,7 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie
zookeeper_path
+
"/log/log-"
,
entry
.
toString
(),
zookeeper
->
getDefaultACL
(),
zkutil
::
CreateMode
::
PersistentSequential
));
}
std
::
string
log_msg
=
"Adding attaches to log"
;
if
(
is_leader_node
)
{
/// Если ATTACH PART выполняется в рамках перешардирования, обновляем информацию о блоках на шарде.
auto
children
=
zookeeper
->
getChildren
(
zookeeper_path
+
"/detached_sharded_blocks"
);
if
(
!
children
.
empty
())
{
log_msg
+=
". Updating information about blocks in the context of the resharding operation."
;
auto
acl
=
zookeeper
->
getDefaultACL
();
for
(
const
auto
&
child
:
children
)
{
std
::
string
checksum
=
zookeeper
->
get
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/checksum"
);
std
::
string
number
=
zookeeper
->
get
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/number"
);
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/blocks/"
+
child
,
""
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/blocks/"
+
child
+
"/checksum"
,
checksum
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Create
(
zookeeper_path
+
"/blocks/"
+
child
+
"/number"
,
number
,
acl
,
zkutil
::
CreateMode
::
Persistent
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/number"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
+
"/checksum"
,
-
1
));
ops
.
push_back
(
new
zkutil
::
Op
::
Remove
(
zookeeper_path
+
"/detached_sharded_blocks/"
+
child
,
-
1
));
}
}
}
LOG_DEBUG
(
log
,
log_msg
);
LOG_DEBUG
(
log
,
"Adding attaches to log"
);
zookeeper
->
multi
(
ops
);
...
...
@@ -3481,12 +3429,12 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const
}
void
StorageReplicatedMergeTree
::
reshardPartitions
(
const
String
&
database_name
,
const
Field
&
first_partition
,
const
Field
&
last_partition
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
String
&
sharding_key
,
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
,
const
ASTPtr
&
sharding_key_expr
,
const
Settings
&
settings
)
{
auto
&
resharding_worker
=
context
.
getReshardingWorker
();
if
(
!
resharding_worker
.
isStarted
())
throw
Exception
(
"Resharding
worker
is not running."
,
ErrorCodes
::
RESHARDING_NO_WORKER
);
throw
Exception
(
"Resharding
background thread
is not running."
,
ErrorCodes
::
RESHARDING_NO_WORKER
);
for
(
const
auto
&
weighted_path
:
weighted_zookeeper_paths
)
{
...
...
@@ -3495,14 +3443,6 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
throw
Exception
(
"Shard has invalid weight"
,
ErrorCodes
::
INVALID_SHARD_WEIGHT
);
}
for
(
const
auto
&
weighted_path
:
weighted_zookeeper_paths
)
{
const
std
::
string
&
path
=
weighted_path
.
first
;
if
((
path
.
length
()
<=
getTableName
().
length
())
||
(
path
.
substr
(
path
.
length
()
-
getTableName
().
length
())
!=
getTableName
()))
throw
Exception
(
"Shard does not reference table"
,
ErrorCodes
::
SHARD_DOESNT_REFERENCE_TABLE
);
}
DayNum_t
first_partition_num
=
!
first_partition
.
isNull
()
?
MergeTreeData
::
getMonthDayNum
(
first_partition
)
:
DayNum_t
();
DayNum_t
last_partition_num
=
!
last_partition
.
isNull
()
?
MergeTreeData
::
getMonthDayNum
(
last_partition
)
:
DayNum_t
();
...
...
@@ -3548,7 +3488,7 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name,
/// Зарегистрировать фоновые задачи перешардирования.
for
(
const
auto
&
partition
:
partition_list
)
resharding_worker
.
submitJob
(
database_name
,
getTableName
(),
partition
,
weighted_zookeeper_paths
,
sharding_key
);
resharding_worker
.
submitJob
(
database_name
,
getTableName
(),
partition
,
weighted_zookeeper_paths
,
sharding_key
_expr
);
}
void
StorageReplicatedMergeTree
::
enforceShardsConsistency
(
const
WeightedZooKeeperPaths
&
weighted_zookeeper_paths
)
...
...
@@ -3641,8 +3581,8 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
InterserverIOEndpointLocation
location
(
replica_path
,
address
.
host
,
address
.
replication_port
);
tasks
[
i
]
=
Tasks
::
value_type
(
std
::
bind
(
&
RemoteDiskSpaceMonitor
::
Client
::
getFree
Disk
Space
,
&
free_disk_space_checker
,
location
));
tasks
[
i
]
=
Tasks
::
value_type
(
std
::
bind
(
&
RemoteDiskSpaceMonitor
::
Client
::
getFreeSpace
,
&
disk_space_monitor_client
,
location
));
pool
.
schedule
([
i
,
&
tasks
]{
tasks
[
i
]();
});
}
}
...
...
libs/libzkutil/include/zkutil/ZooKeeper.h
浏览文件 @
d29ae515
...
...
@@ -173,8 +173,6 @@ public:
*/
void
waitForDisappear
(
const
std
::
string
&
path
);
std
::
string
getTaskQueuePath
()
const
;
/** Асинхронный интерфейс (реализовано небольшое подмножество операций).
*
* Использование:
...
...
@@ -300,7 +298,7 @@ private:
friend
struct
WatchWithEvent
;
friend
class
EphemeralNodeHolder
;
void
init
(
const
std
::
string
&
hosts
,
int32_t
session_timeout_ms
,
const
std
::
string
&
task_queue_path_
=
""
);
void
init
(
const
std
::
string
&
hosts
,
int32_t
session_timeout_ms
);
void
removeChildrenRecursive
(
const
std
::
string
&
path
);
void
tryRemoveChildrenRecursive
(
const
std
::
string
&
path
);
void
*
watchForEvent
(
EventPtr
event
);
...
...
@@ -343,7 +341,6 @@ private:
int32_t
existsImpl
(
const
std
::
string
&
path
,
Stat
*
stat_
,
EventPtr
watch
=
nullptr
);
std
::
string
hosts
;
std
::
string
task_queue_path
;
int32_t
session_timeout_ms
;
std
::
mutex
mutex
;
...
...
libs/libzkutil/src/ZooKeeper.cpp
浏览文件 @
d29ae515
...
...
@@ -61,13 +61,12 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p
}
}
void
ZooKeeper
::
init
(
const
std
::
string
&
hosts_
,
int32_t
session_timeout_ms_
,
const
std
::
string
&
task_queue_path_
)
void
ZooKeeper
::
init
(
const
std
::
string
&
hosts_
,
int32_t
session_timeout_ms_
)
{
log
=
&
Logger
::
get
(
"ZooKeeper"
);
zoo_set_debug_level
(
ZOO_LOG_LEVEL_ERROR
);
hosts
=
hosts_
;
session_timeout_ms
=
session_timeout_ms_
;
task_queue_path
=
task_queue_path_
;
impl
=
zookeeper_init
(
hosts
.
c_str
(),
nullptr
,
session_timeout_ms
,
nullptr
,
nullptr
,
0
);
ProfileEvents
::
increment
(
ProfileEvents
::
ZooKeeperInit
);
...
...
@@ -105,10 +104,6 @@ struct ZooKeeperArgs
{
session_timeout_ms
=
config
.
getInt
(
config_name
+
"."
+
key
);
}
else
if
(
key
==
"task_queue_path"
)
{
task_queue_path
=
config
.
getString
(
config_name
+
"."
+
key
);
}
else
throw
KeeperException
(
std
::
string
(
"Unknown key "
)
+
key
+
" in config file"
);
}
...
...
@@ -125,13 +120,12 @@ struct ZooKeeperArgs
std
::
string
hosts
;
size_t
session_timeout_ms
;
std
::
string
task_queue_path
;
};
ZooKeeper
::
ZooKeeper
(
const
Poco
::
Util
::
AbstractConfiguration
&
config
,
const
std
::
string
&
config_name
)
{
ZooKeeperArgs
args
(
config
,
config_name
);
init
(
args
.
hosts
,
args
.
session_timeout_ms
,
args
.
task_queue_path
);
init
(
args
.
hosts
,
args
.
session_timeout_ms
);
}
void
*
ZooKeeper
::
watchForEvent
(
EventPtr
event
)
...
...
@@ -584,11 +578,6 @@ void ZooKeeper::waitForDisappear(const std::string & path)
}
}
std
::
string
ZooKeeper
::
getTaskQueuePath
()
const
{
return
task_queue_path
;
}
ZooKeeper
::~
ZooKeeper
()
{
LOG_INFO
(
&
Logger
::
get
(
"~ZooKeeper"
),
"Closing ZooKeeper session"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录