提交 d2847f22 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

PR fixes [#CLICKHOUSE-3000]

上级 61987df1
......@@ -7,6 +7,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <IO/WriteBufferFromFile.h>
......@@ -198,14 +199,9 @@ void MergeTreeData::initPrimaryKey()
void MergeTreeData::initPartitionKey()
{
String partition_expr_str = "toYYYYMM(" + date_column_name + ")";
Tokens tokens(partition_expr_str.data(), partition_expr_str.data() + partition_expr_str.size());
ParserNotEmptyExpressionList parser(/* allow_alias_without_as_keyword = */ false);
TokenIterator token_it(tokens);
Expected expected;
bool parsed = parser.parse(token_it, partition_expr_ast, expected);
if (!parsed || !token_it->isEnd())
throw Exception("Can't parse partition expression: `" + partition_expr_str + "`", ErrorCodes::SYNTAX_ERROR);
partition_expr_ast = parseQuery(
parser, partition_expr_str.data(), partition_expr_str.data() + partition_expr_str.length(), "partition expression");
partition_expr = ExpressionAnalyzer(partition_expr_ast, context, nullptr, getColumnsList()).getActions(false);
for (const ASTPtr & ast : partition_expr_ast->children)
partition_expr_columns.emplace_back(ast->getColumnName());
......
......@@ -289,8 +289,8 @@ void MinMaxIndex::update(const Block & block, const Names & column_names)
{
if (!initialized)
{
min_column_values.resize(column_names.size(), /* dont_init_elems = */ true);
max_column_values.resize(column_names.size(), /* dont_init_elems = */ true);
min_column_values.resize(column_names.size());
max_column_values.resize(column_names.size());
}
for (size_t i = 0; i < column_names.size(); ++i)
......@@ -302,8 +302,8 @@ void MinMaxIndex::update(const Block & block, const Names & column_names)
if (!initialized)
{
new (min_column_values.place(i)) Field(min_value);
new (max_column_values.place(i)) Field(max_value);
min_column_values[i] = Field(min_value);
max_column_values[i] = Field(max_value);
}
else
{
......
......@@ -80,6 +80,10 @@ struct MergeTreeDataPartChecksums
};
/// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
/// parts based on conditions on these columns imposed by a query.
/// Currently this index is built using only columns required by partition expression, but in principle it
/// can be built using any set of columns.
struct MinMaxIndex
{
void update(const Block & block, const Names & column_names);
......
......@@ -24,10 +24,10 @@ namespace
void buildScatterSelector(
const ConstColumnPlainPtrs & columns,
PODArray<size_t> & partitions_rows,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
Data partitions_map;
......@@ -42,7 +42,7 @@ void buildScatterSelector(
if (inserted)
{
partitions_rows.push_back(i);
partition_num_to_first_row.push_back(i);
it->second = partitions_count;
++partitions_count;
......@@ -85,18 +85,18 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
for (const String & name : data.partition_expr_columns)
partition_columns.emplace_back(block_copy.getByName(name).column.get());
PODArray<size_t> partitions_rows;
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partitions_rows, selector);
buildScatterSelector(partition_columns, partition_num_to_first_row, selector);
size_t partitions_count = partitions_rows.size();
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
auto get_partition = [&](size_t num)
{
Row partition(partition_columns.size(), DontInitElemsTag{});
Row partition(partition_columns.size());
for (size_t i = 0; i < partition_columns.size(); ++i)
new (partition.place(i)) Field((*partition_columns[i])[partitions_rows[num]]);
partition[i] = Field((*partition_columns[i])[partition_num_to_first_row[num]]);
return partition;
};
......
......@@ -1074,14 +1074,15 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
/// Logging
Stopwatch stopwatch;
MergeTreeDataMerger::FuturePart future_part(parts);
if (future_part.name != entry.new_part_name)
MergeTreeDataMerger::FuturePart future_merged_part(parts);
if (future_merged_part.name != entry.new_part_name)
throw Exception(
"Future merged part name `" + future_part.name + "` differs from part name in log entry: `" + entry.new_part_name + "`",
"Future merged part name `" + future_merged_part.name +
"` differs from part name in log entry: `" + entry.new_part_name + "`",
ErrorCodes::BAD_DATA_PART_NAME);
auto part = merger.mergePartsToTemporaryPart(
future_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate);
future_merged_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate);
zkutil::Ops ops;
......@@ -1783,7 +1784,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
else
{
MergeTreeDataMerger::FuturePart future_part;
MergeTreeDataMerger::FuturePart future_merged_part;
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
......@@ -1791,10 +1792,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(
future_part, false,
future_merged_part, false,
max_parts_size_for_merge,
can_merge)
&& createLogEntryToMergeParts(future_part.parts, future_part.name, deduplicate))
&& createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate))
{
success = true;
need_pull = true;
......@@ -2369,22 +2370,23 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const String & p
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
MergeTreeDataMerger::FuturePart future_part;
MergeTreeDataMerger::FuturePart future_merged_part;
bool selected = false;
if (partition_id.empty())
{
selected = merger.selectPartsToMerge(future_part, false, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
selected = merger.selectPartsToMerge(
future_merged_part, false, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
}
else
{
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final);
selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final);
}
if (!selected)
return false;
if (!createLogEntryToMergeParts(future_part.parts, future_part.name, deduplicate, &merge_entry))
if (!createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册