提交 a34f86cc 编写于 作者: M Michael Kolupaev

clickhouse: FINAL only returning positive rows [#CONV-7363].

上级 e3c0352b
......@@ -8,8 +8,9 @@
namespace DB
{
/// То же, что CollapsingSortedBlockInputStream, но выдает строки в произвольном порядке.
/// Входные потоки по-прежнему должны быть упорядочены.
/// Схлопывает одинаковые строки с противоположным знаком примерно как CollapsingSortedBlockInputStream.
/// Выдает строки в произвольном порядке (входные потоки по-прежнему должны быть упорядочены).
/// Выдает только строки с положительным знаком.
class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{
public:
......@@ -226,9 +227,7 @@ private:
Queue queue;
Cursor previous;
Cursor first_negative; /// Первая отрицательная строка для текущего первичного ключа.
Cursor previous; /// Текущий первичный ключ.
Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа.
size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
......
......@@ -301,6 +301,9 @@ private:
BlockInputStreams spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
/// Создать выражение "Sign == 1".
void createPositiveSignCondition(ExpressionPtr & out_expression, String & out_column);
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
void loadDataParts();
......
......@@ -8,7 +8,6 @@ CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
{
/// Нужно обезвредить все MergingBlockPtr, чтобы они не пытались класть блоки в output_blocks.
previous.block.cancel();
first_negative.block.cancel();
last_positive.block.cancel();
while (!queue.empty())
......@@ -49,10 +48,6 @@ void CollapsingFinalBlockInputStream::commitCurrent()
{
if (count_positive || count_negative)
{
if (count_positive <= count_negative)
{
first_negative.addToFilter();
}
if (count_positive >= count_negative)
{
last_positive.addToFilter();
......@@ -61,7 +56,6 @@ void CollapsingFinalBlockInputStream::commitCurrent()
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
reportBadCounts();
first_negative = Cursor();
last_positive = Cursor();
previous = Cursor();
}
......@@ -108,8 +102,6 @@ Block CollapsingFinalBlockInputStream::readImpl()
}
else if (sign == -1)
{
if (!count_negative)
first_negative = current;
++count_negative;
}
else
......
......@@ -391,6 +391,10 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataPar
/// Распределить засечки между потоками и сделать, чтобы в ответе (почти) все данные были сколлапсированы (модификатор FINAL).
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size)
{
ExpressionPtr sign_filter_expression;
String sign_filter_column;
createPositiveSignCondition(sign_filter_expression, sign_filter_column);
BlockInputStreams res;
BlockInputStreams to_collapse;
......@@ -403,7 +407,7 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDa
part.data_part, part.ranges, thisPtr());
if (part.data_part->size * index_granularity >= settings.min_rows_to_skip_collapsing)
res.push_back(source_stream);
res.push_back(new FilterBlockInputStream(new ExpressionBlockInputStream(source_stream, sign_filter_expression), sign_filter_column));
else
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, primary_expr));
}
......@@ -417,6 +421,38 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDa
}
void StorageMergeTree::createPositiveSignCondition(ExpressionPtr & out_expression, String & out_column)
{
ASTFunction * function = new ASTFunction;
ASTPtr function_ptr = function;
ASTExpressionList * arguments = new ASTExpressionList;
ASTPtr arguments_ptr = arguments;
ASTIdentifier * sign = new ASTIdentifier;
ASTPtr sign_ptr = sign;
ASTLiteral * one = new ASTLiteral;
ASTPtr one_ptr = one;
function->name = "equals";
function->arguments = arguments_ptr;
function->children.push_back(arguments_ptr);
arguments->children.push_back(sign_ptr);
arguments->children.push_back(one_ptr);
sign->name = sign_column;
sign->kind = ASTIdentifier::Column;
one->type = new DataTypeInt8;
one->value = Field(static_cast<Int64>(1));
out_expression = new Expression(function_ptr, context);
out_column = function->getColumnName();
}
String StorageMergeTree::getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
{
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册