提交 e24e6257 编写于 作者: A Albert Kidrachev

fix

上级 5f013a36
......@@ -87,33 +87,4 @@ struct SharedBlockRowRef
}
};
struct SharedBlockRowWithSortDescriptionRef : SharedBlockRowRef
{
SortDescription * description = nullptr;
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_) = delete;
bool operator< (const SharedBlockRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
{
int res = (*description)[i].direction * (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_, SortDescription * description_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
description = description_;
}
};
}
......@@ -84,7 +84,6 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/OptimizedPartialSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
......@@ -2109,7 +2108,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<OptimizedPartialSortingTransform>(header, output_order_descr, limit);
return std::make_shared<PartialSortingTransform>(header, output_order_descr, limit);
});
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
......
#include <Processors/Transforms/OptimizedPartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
#include <Common/PODArray.h>
namespace DB
{
OptimizedPartialSortingTransform::OptimizedPartialSortingTransform(
const Block & header_, SortDescription & description_, UInt64 limit_)
: ISimpleTransform(header_, header_, false)
, description(description_), limit(limit_)
, threshold_shared_block(nullptr)
{
}
static ColumnRawPtrs extractColumns(const Block & block, const SortDescription& description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
void OptimizedPartialSortingTransform::transform(Chunk & chunk)
{
if (read_rows)
read_rows->add(chunk.getNumRows());
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();
SharedBlockPtr shared_block = new detail::SharedBlock(std::move(block));
UInt64 rows_num = shared_block->rows();
if (threshold_shared_block) {
SharedBlockRowWithSortDescriptionRef row;
IColumn::Filter filter(rows_num);
ColumnRawPtrs shared_block_columns = extractColumns(*shared_block, description);
size_t filtered_count = 0;
for (UInt64 i = 0; i < rows_num; ++i) {
row.set(shared_block, &shared_block_columns, i, &description);
if (threshold_row < row)
{
++filtered_count;
filter[i] = 1;
}
}
if (filtered_count)
{
for (auto & column : shared_block->getColumns())
{
column = column->filter(filter, filtered_count);
}
}
}
sortBlock(*shared_block, description, limit);
if (!threshold_shared_block && limit && limit < rows_num)
{
Block threshold_block = shared_block->cloneWithColumns(shared_block->getColumns());
threshold_shared_block = new detail::SharedBlock(std::move(threshold_block));
threshold_block_columns = extractColumns(*threshold_shared_block, description);
threshold_row.set(threshold_shared_block, &threshold_block_columns, limit - 1, &description);
}
chunk.setColumns(shared_block->getColumns(), shared_block->rows());
}
}
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
class OptimizedPartialSortingTransform : public ISimpleTransform
{
public:
OptimizedPartialSortingTransform(
const Block & header_,
SortDescription & description_,
UInt64 limit_ = 0);
String getName() const override { return "OptimizedPartialSortingTransform"; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); }
protected:
void transform(Chunk & chunk) override;
private:
SortDescription description;
UInt64 limit;
RowsBeforeLimitCounterPtr read_rows;
SharedBlockRowWithSortDescriptionRef threshold_row;
SharedBlockPtr threshold_shared_block;
ColumnRawPtrs threshold_block_columns;
};
}
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
#include <Common/PODArray.h>
namespace DB
{
......@@ -11,6 +12,38 @@ PartialSortingTransform::PartialSortingTransform(
{
}
static ColumnRawPtrs extractColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
bool less(const ColumnRawPtrs & lhs, UInt64 lhs_row_num,
const ColumnRawPtrs & rhs, UInt64 rhs_row_num, const SortDescription & description)
{
size_t size = description.size();
for (size_t i = 0; i < size; ++i)
{
int res = description[i].direction * lhs[i]->compareAt(lhs_row_num, rhs_row_num, *rhs[i], 1);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void PartialSortingTransform::transform(Chunk & chunk)
{
if (read_rows)
......@@ -19,7 +52,40 @@ void PartialSortingTransform::transform(Chunk & chunk)
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();
UInt64 rows_num = block.rows();
if (!threshold_block_columns.empty())
{
IColumn::Filter filter(rows_num, 0);
ColumnRawPtrs block_columns = extractColumns(block, description);
size_t filtered_count = 0;
for (UInt64 i = 0; i < rows_num; ++i)
{
if (less(threshold_block_columns, limit - 1, block_columns, i, description))
{
++filtered_count;
filter[i] = 1;
}
}
if (filtered_count)
{
for (auto & column : block.getColumns())
{
column = column->filter(filter, filtered_count);
}
}
}
sortBlock(block, description, limit);
if (threshold_block_columns.empty() && limit && limit < rows_num)
{
threshold_block = block.cloneWithColumns(block.getColumns());
threshold_block_columns = extractColumns(threshold_block, description);
}
chunk.setColumns(block.getColumns(), block.rows());
}
......
......@@ -29,6 +29,8 @@ private:
SortDescription description;
UInt64 limit;
RowsBeforeLimitCounterPtr read_rows;
Block threshold_block;
ColumnRawPtrs threshold_block_columns;
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册