提交 bc09b513 编写于 作者: A Anastasiya Rodigina

Add reading in pk_order

上级 af4fff97
......@@ -764,13 +764,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto lock = lockParts();
data_parts_indexes.clear();
size_t data_parts_counter = 0;
for (const String & file_name : part_file_names)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info, format_version))
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, file_name, part_info);
MutableDataPartPtr part = std::make_shared<DataPart>(*this, file_name, part_info, data_parts_counter);
++data_parts_counter;
part->relative_path = file_name;
bool broken = false;
......
......@@ -135,17 +135,23 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_, size_t data_part_id_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, data_part_id(data_part_id_)
{
}
MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
MergeTreeDataPart::MergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
size_t data_part_id_)
: storage(storage_)
, name(name_)
, info(info_)
, data_part_id(data_part_id_)
{
}
......
......@@ -30,9 +30,9 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_);
MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_, size_t data_part_id_ = 0);
MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
MergeTreeDataPart(MergeTreeData & storage_, const String & name_, size_t data_part_id_ = 0);
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
......@@ -92,6 +92,9 @@ struct MergeTreeDataPart
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
mutable String relative_path;
/// Is used in reading in pk_order
size_t data_part_id = 0;
size_t rows_count = 0;
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
......@@ -144,6 +147,11 @@ struct MergeTreeDataPart
return name + " (state " + stateString() + ")";
}
void setDataPartId(size_t data_part_id_)
{
data_part_id = data_part_id_;
}
/// Returns true if state of part is one of affordable_states
bool checkState(const std::initializer_list<State> & affordable_states) const
{
......
......@@ -587,6 +587,28 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
virt_column_names,
settings);
}
else if (settings.optimize_pk_order)
{
std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
if (!data.merging_params.version_column.empty())
column_names_to_read.push_back(data.merging_params.version_column);
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
res = spreadMarkRangesAmongStreamsPKOrder(
std::move(parts_with_ranges),
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info,
virt_column_names,
settings);
}
else
{
res = spreadMarkRangesAmongStreams(
......@@ -790,6 +812,49 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrder(
RangesInDataParts && parts,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
{
const PrewhereInfoPtr prewhere_info = query_info.prewhere_info;
const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
settings.merge_tree_max_rows_to_use_cache,
settings.merge_tree_max_bytes_to_use_cache,
data.index_granularity_info);
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams to_merge;
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.sorting_key_expr));
}
return to_merge;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
......
......@@ -56,6 +56,15 @@ private:
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsPKOrder(
RangesInDataParts && parts,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
const Names & column_names,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册