提交 2daef385 编写于 作者: N Nikolai Kochetov

Use SortedTransforms in merge.

上级 20fc52f0
......@@ -19,7 +19,7 @@ class AggregatingSortedTransform : public IMergingTransform
{
public:
AggregatingSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size);
struct SimpleAggregateDescription;
......
......@@ -22,7 +22,7 @@ static GraphiteRollupSortedTransform::ColumnsDefinition defineColumns(
}
GraphiteRollupSortedTransform::GraphiteRollupSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(num_inputs, header, header, true)
......
......@@ -151,7 +151,7 @@ class GraphiteRollupSortedTransform : public IMergingTransform
{
public:
GraphiteRollupSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_);
......
......@@ -10,7 +10,7 @@ namespace ErrorCodes
}
ReplacingSortedTransform::ReplacingSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
......
......@@ -17,7 +17,7 @@ class ReplacingSortedTransform final : public IMergingTransform
{
public:
ReplacingSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
......
......@@ -366,7 +366,7 @@ namespace
}
SummingSortedTransform::SummingSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
......
......@@ -19,7 +19,7 @@ class SummingSortedTransform final : public IMergingTransform
public:
SummingSortedTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
......
......@@ -8,7 +8,7 @@ namespace DB
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
VersionedCollapsingTransform::VersionedCollapsingTransform(
size_t num_inputs, const Block & header,
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
......
......@@ -20,11 +20,11 @@ class VersionedCollapsingTransform final : public IMergingTransform
public:
/// Don't need version column. It's in primary key.
VersionedCollapsingTransform(
size_t num_inputs, const Block & header,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
String getName() const override { return "VersionedCollapsingTransform"; }
void work() override;
......
......@@ -30,7 +30,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/checkDataPart.h>
......
#include "MergeTreeDataMergerMutator.h"
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Disks/DiskSpaceMonitor.h>
......@@ -12,16 +12,19 @@
#include <DataStreams/TTLBlockInputStream.h>
#include <DataStreams/DistinctSortedBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/CollapsingSortedBlockInputStream.h>
#include <DataStreams/SummingSortedBlockInputStream.h>
#include <DataStreams/ReplacingSortedBlockInputStream.h>
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <DataStreams/AggregatingSortedBlockInputStream.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
......@@ -660,7 +663,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
*/
BlockInputStreams src_streams;
Pipes pipes;
UInt64 watch_prev_elapsed = 0;
/// We count total amount of bytes in parts
......@@ -687,18 +690,24 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeSequentialBlockInputStream>(
auto input = std::make_unique<MergeTreeSequentialSource>(
data, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
BlockInputStreamPtr stream = std::move(input);
Pipe pipe(std::move(input));
if (data.hasPrimaryKey() || data.hasSkipIndices())
stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_and_skip_indices_expr));
{
auto expr = std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_and_skip_indices_expr);
pipe.addSimpleTransform(std::move(expr));
src_streams.emplace_back(stream);
auto materializing = std::make_shared<MaterializingTransform>(pipe.getHeader());
pipe.addSimpleTransform(std::move(materializing));
}
pipes.emplace_back(std::move(pipe));
}
Names sort_columns = data.sorting_key_columns;
......@@ -706,14 +715,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Block header = src_streams.at(0)->getHeader();
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
std::shared_ptr<IBlockInputStream> merged_stream;
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
bool blocks_are_granules_size = (merge_alg == MergeAlgorithm::Vertical);
......@@ -722,45 +731,48 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size);
merged_transform = std::make_unique<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, rows_sources_write_buf.get(), true, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.sign_column,
merged_transform = std::make_unique<CollapsingSortedTransform>(
header, pipes.size(), sort_description, data.merging_params.sign_column,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.columns_to_sum, merge_block_size);
merged_transform = std::make_unique<SummingSortedTransform>(
header, pipes.size(), sort_description, data.merging_params.columns_to_sum, merge_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, sort_description, merge_block_size);
merged_transform = std::make_unique<AggregatingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size);
break;
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.version_column,
merged_transform = std::make_unique<ReplacingSortedTransform>(
header, pipes.size(), sort_description, data.merging_params.version_column,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, sort_description, merge_block_size,
merged_transform = std::make_unique<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size,
data.merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.sign_column,
merged_transform = std::make_unique<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, data.merging_params.sign_column,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break;
}
Pipe merged_pipe(std::move(pipes), std::move(merged_transform));
BlockInputStreamPtr merged_stream = std::make_shared<TreeExecutorBlockInputStream>(std::move(merged_pipe));
if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names());
......@@ -857,13 +869,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeStageProgress column_progress(progress_before, column_sizes->columnWeight(column_name));
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeSequentialBlockInputStream>(
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
data, parts[part_num], column_names, read_with_direct_io, true);
column_part_stream->setProgressCallback(
column_part_source->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
column_part_streams[part_num] = std::move(column_part_stream);
column_part_streams[part_num] = std::make_shared<TreeExecutorBlockInputStream>(
Pipe(std::move(column_part_source)));
}
rows_sources_read_buf.seek(0, 0);
......
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
namespace DB
......@@ -8,16 +8,17 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
MergeTreeSequentialSource::MergeTreeSequentialSource(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
: storage(storage_)
, data_part(data_part_)
, columns_to_read(columns_to_read_)
: SourceWithProgress(storage_.getSampleBlockForColumns(columns_to_read))
, storage(storage_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
, mark_cache(storage.global_context.getMarkCache())
{
......@@ -33,8 +34,6 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
addTotalRowsApprox(data_part->rows_count);
header = storage.getSampleBlockForColumns(columns_to_read);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
......@@ -62,33 +61,11 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
}
void MergeTreeSequentialBlockInputStream::fixHeader(Block & header_block) const
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
for (const auto & name_type : data_part->getColumns())
{
if (header_block.has(name_type.name))
{
auto & elem = header_block.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
}
Block MergeTreeSequentialBlockInputStream::getHeader() const
{
return header;
}
Block MergeTreeSequentialBlockInputStream::readImpl()
Chunk MergeTreeSequentialSource::generate()
try
{
Block res;
auto & header = getPort().getHeader();
if (!isCancelled() && current_row < data_part->rows_count)
{
size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
......@@ -96,15 +73,15 @@ try
auto & sample = reader->getColumns();
Columns columns(sample.size());
size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
size_t rows_read = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
if (rows_readed)
if (rows_read)
{
current_row += rows_readed;
current_mark += (rows_to_read == rows_readed);
current_row += rows_read;
current_mark += (rows_to_read == rows_read);
bool should_evaluate_missing_defaults = false;
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_readed);
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
if (should_evaluate_missing_defaults)
{
......@@ -113,20 +90,21 @@ try
reader->performRequiredConversions(columns);
res = header.cloneEmpty();
/// Reorder columns and fill result block.
size_t num_columns = sample.size();
Columns res_columns;
res_columns.reserve(num_columns);
auto it = sample.begin();
for (size_t i = 0; i < num_columns; ++i)
{
if (res.has(it->name))
res.getByName(it->name).column = std::move(columns[i]);
if (header.has(it->name))
res_columns.emplace_back(std::move(columns[i]));
++it;
}
res.checkNumberOfRows();
return Chunk(std::move(res_columns), rows_read);
}
}
else
......@@ -134,7 +112,7 @@ try
finish();
}
return res;
return {};
}
catch (...)
{
......@@ -144,8 +122,7 @@ catch (...)
throw;
}
void MergeTreeSequentialBlockInputStream::finish()
void MergeTreeSequentialSource::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
......@@ -155,7 +132,6 @@ void MergeTreeSequentialBlockInputStream::finish()
data_part.reset();
}
MergeTreeSequentialBlockInputStream::~MergeTreeSequentialBlockInputStream() = default;
MergeTreeSequentialSource::~MergeTreeSequentialSource() = default;
}
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MarkRange.h>
......@@ -9,40 +9,33 @@ namespace DB
{
/// Lightweight (in terms of logic) stream for reading single part from MergeTree
class MergeTreeSequentialBlockInputStream : public IBlockInputStream
class MergeTreeSequentialSource : public SourceWithProgress
{
public:
MergeTreeSequentialBlockInputStream(
MergeTreeSequentialSource(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet = false
);
~MergeTreeSequentialBlockInputStream() override;
~MergeTreeSequentialSource() override;
String getName() const override { return "MergeTreeSequentialBlockInputStream"; }
Block getHeader() const override;
/// Closes readers and unlock part locks
void finish();
String getName() const override { return "MergeTreeSequentialSource"; }
size_t getCurrentMark() const { return current_mark; }
size_t getCurrentRow() const { return current_row; }
protected:
Block readImpl() override;
Chunk generate() override;
private:
const MergeTreeData & storage;
Block header;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
......@@ -52,7 +45,7 @@ private:
/// Should read using direct IO
bool read_with_direct_io;
Logger * log = &Logger::get("MergeTreeSequentialBlockInputStream");
Logger * log = &Logger::get("MergeTreeSequentialSource");
std::shared_ptr<MarkCache> mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
......@@ -65,8 +58,8 @@ private:
size_t current_row = 0;
private:
void fixHeader(Block & header_block) const;
/// Closes readers and unlock part locks
void finish();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册