提交 7b85a369 编写于 作者: N Nikolai Kochetov

Added CollapsingSortedAlgorithm.

上级 2aba662d
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedAlgorithm.h>
#include <Columns/ColumnsNumber.h>
#include <Common/FieldVisitors.h>
#include <IO/WriteBuffer.h>
......@@ -16,70 +17,27 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
CollapsingSortedTransform::CollapsingSortedTransform(
CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
const Block & header,
size_t num_inputs,
SortDescription description_,
const String & sign_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingTransform(num_inputs, header, header, true)
bool use_average_block_sizes,
Logger * log_)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, sign_column_number(header.getPositionByName(sign_column))
, out_row_sources_buf(out_row_sources_buf_)
, chunk_allocator(num_inputs + max_row_refs)
, source_chunks(num_inputs)
, cursors(num_inputs)
{
}
void CollapsingSortedTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void CollapsingSortedTransform::consume(Chunk chunk, size_t input_number)
, log(log_)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void CollapsingSortedTransform::updateCursor(Chunk chunk, size_t source_num)
void CollapsingSortedAlgorithm::reportIncorrectData()
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num].reset(source_chunk->getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
}
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
}
if (!log)
return;
void CollapsingSortedTransform::reportIncorrectData()
{
std::stringstream s;
s << "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
......@@ -102,12 +60,12 @@ void CollapsingSortedTransform::reportIncorrectData()
LOG_WARNING(log, s.rdbuf());
}
void CollapsingSortedTransform::insertRow(RowRef & row)
void CollapsingSortedAlgorithm::insertRow(RowRef & row)
{
merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());
}
void CollapsingSortedTransform::insertRows()
void CollapsingSortedAlgorithm::insertRows()
{
if (count_positive == 0 && count_negative == 0)
{
......@@ -150,13 +108,7 @@ void CollapsingSortedTransform::insertRows()
current_row_sources.size() * sizeof(RowSourcePart));
}
void CollapsingSortedTransform::work()
{
merge();
prepareOutputChunk(merged_data);
}
void CollapsingSortedTransform::merge()
IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
{
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
......@@ -174,7 +126,7 @@ void CollapsingSortedTransform::merge()
/// if there are enough rows and the last one is calculated completely
if (key_differs && merged_data.hasEnoughRows())
return;
Status(merged_data.pull());
if (key_differs)
{
......@@ -229,13 +181,12 @@ void CollapsingSortedTransform::merge()
{
/// We take next block from the corresponding source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
return Status(current.impl->order);
}
}
insertRows();
is_finished = true;
return Status(merged_data.pull(), true);
}
}
#pragma once
#include <Processors/Merges/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/MergedData.h>
#include <DataStreams/ColumnGathererStream.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class CollapsingSortedAlgorithm : public IMergingAlgorithmWithSharedChunks
{
public:
CollapsingSortedAlgorithm(
const Block & header,
size_t num_inputs,
SortDescription description_,
const String & sign_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
Logger * log_);
Status merge() override;
private:
MergedData merged_data;
const size_t sign_column_number;
static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
RowRef first_negative_row;
RowRef last_positive_row;
RowRef last_row;
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
/// Fields specific for VERTICAL merge algorithm.
/// Row numbers are relative to the start of current primary key.
size_t current_pos = 0; /// Current row number
size_t first_negative_pos = 0; /// Row number of first_negative
size_t last_positive_pos = 0; /// Row number of last_positive
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
Logger * log;
void reportIncorrectData();
void insertRow(RowRef & row);
void insertRows();
};
}
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/RowRef.h>
#include <Processors/Merges/MergedData.h>
#include <Core/SortDescription.h>
......@@ -24,7 +26,7 @@ namespace DB
* If negative by 1 is greater than positive rows, then only the first negative row is written.
* Otherwise, a logical error.
*/
class CollapsingSortedTransform final : public IMergingTransform
class CollapsingSortedTransform final : public IMergingTransform2<CollapsingSortedAlgorithm>
{
public:
CollapsingSortedTransform(
......@@ -34,64 +36,22 @@ public:
const String & sign_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false)
: IMergingTransform2(
CollapsingSortedAlgorithm(
header,
num_inputs,
std::move(description_),
sign_column,
max_block_size,
out_row_sources_buf_,
use_average_block_sizes,
&Logger::get("CollapsingSortedTransform")),
num_inputs, header, header, true)
{
}
String getName() const override { return "CollapsingSortedTransform"; }
void work() override;
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Logger * log = &Logger::get("CollapsingSortedTransform");
MergedData merged_data;
/// Settings
SortDescription description;
const size_t sign_column_number;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
/// Allocator must be destroyed after all RowRefs.
detail::SharedChunkAllocator chunk_allocator;
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>;
SourceChunks source_chunks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
using RowRef = detail::RowRefWithOwnedChunk;
static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
RowRef first_negative_row;
RowRef last_positive_row;
RowRef last_row;
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.
bool last_is_positive = false; /// true if the last row for the current primary key is positive.
/// Fields specific for VERTICAL merge algorithm.
/// Row numbers are relative to the start of current primary key.
size_t current_pos = 0; /// Current row number
size_t first_negative_pos = 0; /// Row number of first_negative
size_t last_positive_pos = 0; /// Row number of last_positive
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
void reportIncorrectData();
void insertRow(RowRef & row);
void insertRows();
void merge();
void updateCursor(Chunk chunk, size_t source_num);
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, source_chunks[cursor.impl->order]); }
};
}
#include <Processors/Merges/IMergingAlgorithmWithSharedChunks.h>
namespace DB
{
IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
size_t num_inputs,
SortDescription description_,
WriteBuffer * out_row_sources_buf_,
size_t max_row_refs)
: description(std::move(description_))
, chunk_allocator(num_inputs + max_row_refs)
, source_chunks(num_inputs)
, cursors(num_inputs)
, out_row_sources_buf(out_row_sources_buf_)
{
}
static void prepareChunk(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
}
void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks)
{
source_chunks.resize(chunks.size());
for (size_t source_num = 0; source_num < source_chunks.size(); ++source_num)
{
if (!chunks[source_num])
continue;
prepareChunk(chunks[source_num]);
auto & source_chunk = source_chunks[source_num];
source_chunk = chunk_allocator.alloc(std::move(chunks[source_num]));
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
}
queue = SortingHeap<SortCursor>(cursors);
}
void IMergingAlgorithmWithSharedChunks::consume(Chunk chunk, size_t source_num)
{
prepareChunk(chunk);
auto & source_chunk = source_chunks[source_num];
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num].reset(source_chunk->getColumns(), {});
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
queue.push(cursors[source_num]);
}
}
#pragma once
#include <Processors/Merges/IMergingAlgorithm.h>
#include <Processors/Merges/RowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
class IMergingAlgorithmWithSharedChunks : public IMergingAlgorithm
{
public:
IMergingAlgorithmWithSharedChunks(
size_t num_inputs,
SortDescription description_,
WriteBuffer * out_row_sources_buf_,
size_t max_row_refs);
void initialize(Chunks chunks) override;
void consume(Chunk chunk, size_t source_num) override;
private:
SortDescription description;
/// Allocator must be destroyed after source_chunks.
detail::SharedChunkAllocator chunk_allocator;
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>;
SourceChunks source_chunks;
SortCursorImpls cursors;
protected:
SortingHeap<SortCursor> queue;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
using RowRef = detail::RowRefWithOwnedChunk;
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, source_chunks[cursor.impl->order]); }
};
}
......@@ -48,12 +48,6 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows);
}
void MergingSortedAlgorithm::updateCursor(size_t source_num)
{
auto & source_chunk = source_chunks[source_num];
cursors[source_num].reset(source_chunk.getColumns(), {});
}
void MergingSortedAlgorithm::initialize(Chunks chunks)
{
source_chunks = std::move(chunks);
......
......@@ -49,7 +49,6 @@ private:
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SortCursorWithCollation> queue_with_collation;
void updateCursor(size_t source_num);
Status insertFromChunk(size_t source_num);
template <typename TSortingHeap>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册