提交 ba9315d2 编写于 作者: M Michael Kolupaev

Merge

上级 d5e95781
......@@ -14,7 +14,7 @@ namespace DB
class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{
public:
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, SortDescription & description_,
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_)
: description(description_), sign_column(sign_column_),
log(&Logger::get("CollapsingSortedBlockInputStream")),
......
......@@ -23,7 +23,7 @@ namespace DB
class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_,
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
sign_column(sign_column_), sign_column_number(0),
......
......@@ -19,7 +19,7 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
: description(description_), max_block_size(max_block_size_), limit(limit_), total_merged_rows(0), first(true), has_collation(false),
num_columns(0), source_blocks(inputs_.size()), cursors(inputs_.size()), log(&Logger::get("MergingSortedBlockInputStream"))
{
......
......@@ -18,7 +18,7 @@ namespace DB
class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
SummingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_)
SummingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
log(&Logger::get("SummingSortedBlockInputStream")), current_row_is_zero(false)
{
......
......@@ -245,6 +245,8 @@ public:
*/
void setOwningStorage(StoragePtr storage) { owning_storage = storage; }
StoragePtr getOwningStorage() const { return owning_storage; }
std::string getModePrefix() const;
std::string getSignColumnName() const { return sign_column; }
......@@ -341,26 +343,29 @@ public:
*/
void alter(const ASTAlterQuery::Parameters & params);
/// Эти поля не нужно изменять снаружи. NOTE нужно спрятать их и сделать методы get*.
ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; }
SortDescription getSortDescription() const { return sort_descr; }
const Context & context;
String date_column_name;
ASTPtr sampling_expression;
size_t index_granularity;
const String date_column_name;
const ASTPtr sampling_expression;
const size_t index_granularity;
/// Режим работы - какие дополнительные действия делать при мердже.
Mode mode;
const Mode mode;
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
String sign_column;
const String sign_column;
MergeTreeSettings settings;
const MergeTreeSettings settings;
private:
ExpressionActionsPtr primary_expr;
SortDescription sort_descr;
Block primary_key_sample;
StorageWeakPtr owning_storage;
private:
ASTPtr primary_expr_ast;
String full_path;
......
......@@ -224,11 +224,11 @@ public:
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
PrimaryColumns primary_columns;
for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i)
for (const auto & descr : storage.getSortDescription())
primary_columns.push_back(
!storage.sort_descr[i].column_name.empty()
? &block.getByName(storage.sort_descr[i].column_name)
: &block.getByPosition(storage.sort_descr[i].column_number));
!descr.column_name.empty()
? &block.getByName(descr.column_name)
: &block.getByPosition(descr.column_number));
for (size_t i = index_offset; i < rows; i += storage.index_granularity)
{
......
......@@ -269,7 +269,7 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
structure->getFullPath() + parts[i]->name + '/', structure, DEFAULT_MERGE_BLOCK_SIZE, all_column_names, data, parts[i], ranges,
StoragePtr(), false, NULL, ""), data.primary_expr));
StoragePtr(), false, NULL, ""), data.getPrimaryExpression()));
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
......@@ -279,15 +279,15 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
switch (data.mode)
{
case MergeTreeData::Ordinary:
merged_stream = new MergingSortedBlockInputStream(src_streams, data.sort_descr, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = new MergingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Collapsing:
merged_stream = new CollapsingSortedBlockInputStream(src_streams, data.sort_descr, data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = new CollapsingSortedBlockInputStream(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = new SummingSortedBlockInputStream(src_streams, data.sort_descr, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = new SummingSortedBlockInputStream(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
default:
......
......@@ -32,7 +32,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
structure->check(column_names_to_return);
processed_stage = QueryProcessingStage::FetchColumns;
PKCondition key_condition(query, data.context, structure->getColumnsList(), data.sort_descr);
PKCondition key_condition(query, data.context, structure->getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, data.context, structure->getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
MergeTreeData::DataPartsVector parts;
......@@ -90,7 +90,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
UInt64 sampling_column_max = 0;
DataTypePtr type = data.primary_expr->getSampleBlock().getByName(data.sampling_expression->getColumnName()).type;
DataTypePtr type = data.getPrimaryExpression()->getSampleBlock().getByName(data.sampling_expression->getColumnName()).type;
if (type->getName() == "UInt64")
sampling_column_max = std::numeric_limits<UInt64>::max();
......@@ -173,7 +173,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (select.final)
{
/// Добавим столбцы, нужные для вычисления первичного ключа и знака.
std::vector<String> add_columns = data.primary_expr->getRequiredColumns();
std::vector<String> add_columns = data.getPrimaryExpression()->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
column_names_to_read.push_back(data.sign_column);
std::sort(column_names_to_read.begin(), column_names_to_read.end());
......@@ -284,7 +284,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
streams.push_back(new MergeTreeBlockInputStream(
structure->getFullPath() + part.data_part->name + '/', structure, max_block_size, column_names, data,
part.data_part, part.ranges, data.owning_storage, use_uncompressed_cache,
part.data_part, part.ranges, data.getOwningStorage(), use_uncompressed_cache,
prewhere_actions, prewhere_column));
need_marks -= marks_in_part;
parts.pop_back();
......@@ -314,7 +314,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
streams.push_back(new MergeTreeBlockInputStream(
structure->getFullPath() + part.data_part->name + '/', structure, max_block_size, column_names, data,
part.data_part, ranges_to_get_from_part, data.owning_storage, use_uncompressed_cache,
part.data_part, ranges_to_get_from_part, data.getOwningStorage(), use_uncompressed_cache,
prewhere_actions, prewhere_column));
}
......@@ -361,17 +361,17 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
structure->getFullPath() + part.data_part->name + '/', structure, max_block_size, column_names, data,
part.data_part, part.ranges, data.owning_storage, use_uncompressed_cache,
part.data_part, part.ranges, data.getOwningStorage(), use_uncompressed_cache,
prewhere_actions, prewhere_column);
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, data.primary_expr));
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
BlockInputStreams res;
if (to_collapse.size() == 1)
res.push_back(new FilterBlockInputStream(new ExpressionBlockInputStream(to_collapse[0], sign_filter_expression), sign_filter_column));
else if (to_collapse.size() > 1)
res.push_back(new CollapsingFinalBlockInputStream(to_collapse, data.sort_descr, data.sign_column));
res.push_back(new CollapsingFinalBlockInputStream(to_collapse, data.getSortDescription(), data.sign_column));
return res;
}
......@@ -413,7 +413,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeDat
{
MarkRanges res;
size_t key_size = data.sort_descr.size();
size_t key_size = data.getSortDescription().size();
size_t marks_count = index.size() / key_size;
/// Если индекс не используется.
......
......@@ -93,19 +93,21 @@ MergeTreeData::DataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDateInter
LOG_TRACE(log, "Calculating primary expression.");
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
data.primary_expr->execute(block);
data.getPrimaryExpression()->execute(block);
LOG_TRACE(log, "Sorting by primary key.");
SortDescription sort_descr = data.getSortDescription();
/// Сортируем.
stableSortBlock(block, data.sort_descr);
stableSortBlock(block, sort_descr);
/// Наконец-то можно писать данные на диск.
LOG_TRACE(log, "Writing index.");
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
MergeTreeData::DataPart::Index index_vec;
index_vec.reserve(part_size * data.sort_descr.size());
index_vec.reserve(part_size * sort_descr.size());
{
WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
......@@ -113,11 +115,11 @@ MergeTreeData::DataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDateInter
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
PrimaryColumns primary_columns;
for (size_t i = 0, size = data.sort_descr.size(); i < size; ++i)
for (size_t i = 0, size = sort_descr.size(); i < size; ++i)
primary_columns.push_back(
!data.sort_descr[i].column_name.empty()
? &block.getByName(data.sort_descr[i].column_name)
: &block.getByPosition(data.sort_descr[i].column_number));
!sort_descr[i].column_name.empty()
? &block.getByName(sort_descr[i].column_name)
: &block.getByPosition(sort_descr[i].column_number));
for (size_t i = 0; i < rows; i += data.index_granularity)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册