提交 93143369 编写于 作者: M Michael Kolupaev

clickhouse: added support for non-UInt32 sampling columns [#6201].

上级 17612f45
......@@ -825,9 +825,7 @@ BlockInputStreams StorageMergeTree::read(
/// Семплирование.
Names column_names_to_read = column_names_to_return;
size_t count_limit = std::numeric_limits<size_t>::max();
bool sample_by_value = false;
UInt64 sample_column_value_limit = 0;
UInt64 sampling_column_value_limit = 0;
typedef Poco::SharedPtr<ASTFunction> ASTFunctionPtr;
ASTFunctionPtr filter_function;
ExpressionPtr filter_expression;
......@@ -835,31 +833,51 @@ BlockInputStreams StorageMergeTree::read(
ASTSelectQuery & select = *dynamic_cast<ASTSelectQuery*>(&*query);
if (select.sample_size)
{
double size = boost::apply_visitor(FieldVisitorConvertToNumber<double>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
double size = boost::apply_visitor(FieldVisitorConvertToNumber<double>(), dynamic_cast<ASTLiteral&>(*select.sample_size).value);
if (size < 0)
throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (size > 1)
{
count_limit = boost::apply_visitor(FieldVisitorConvertToNumber<UInt64>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
size_t requested_count = boost::apply_visitor(FieldVisitorConvertToNumber<UInt64>(), dynamic_cast<ASTLiteral&>(*select.sample_size).value);
/// Узнаем количество строк в таблице.
size_t total_count = 0;
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
total_count += (*it)->size;
}
total_count *= index_granularity;
size = std::min(1., static_cast<double>(requested_count) / total_count);
}
UInt64 sampling_column_max = 0;
DataTypePtr type = Expression(sampling_expression, context).getReturnTypes()[0];
if (type->getName() == "UInt64")
sampling_column_max = std::numeric_limits<UInt64>::max();
else if (type->getName() == "UInt32")
sampling_column_max = std::numeric_limits<UInt32>::max();
else if (type->getName() == "UInt16")
sampling_column_max = std::numeric_limits<UInt16>::max();
else if (type->getName() == "UInt8")
sampling_column_max = std::numeric_limits<UInt8>::max();
else
{
sample_by_value = true;
sample_column_value_limit = static_cast<UInt64>(size * std::numeric_limits<UInt32>::max());
if (!key_condition.addCondition(sampling_expression->getColumnName(),
Range::RightBounded(sample_column_value_limit, true)))
throw Exception("Invalid sampling column in storage parameters", ErrorCodes::ILLEGAL_COLUMN);
}
}
if (sample_by_value)
{
/// Выражение для фильтрации: sampling_column_name <= sample_column_value_limit
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
sampling_column_value_limit = static_cast<UInt64>(size * sampling_column_max);
if (!key_condition.addCondition(sampling_expression->getColumnName(),
Range::RightBounded(sampling_column_value_limit, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit
ASTPtr filter_function_args = new ASTExpressionList;
filter_function_args->children.push_back(sampling_expression);
filter_function_args->children.push_back(new ASTLiteral(StringRange(), sample_column_value_limit));
filter_function_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_limit));
filter_function = new ASTFunction;
filter_function->name = "lessOrEquals";
filter_function->arguments = filter_function_args;
......@@ -891,7 +909,6 @@ BlockInputStreams StorageMergeTree::read(
RangesInDataParts parts_with_ranges;
/// Найдем, какой диапазон читать из каждого куска.
size_t mark_count_limit = (count_limit - 1) / index_granularity + 1;
size_t sum_marks = 0;
size_t sum_ranges = 0;
for (size_t i = 0; i < parts.size(); ++i)
......@@ -910,21 +927,7 @@ BlockInputStreams StorageMergeTree::read(
for (size_t j = 0; j < ranges.ranges.size(); ++j)
{
sum_marks += ranges.ranges[j].end - ranges.ranges[j].begin;
/// Если нашли достаточно строк.
if (sum_marks >= mark_count_limit)
{
MarkRanges & new_ranges = parts_with_ranges.back().ranges;
/// Обрежем этот отрезок.
new_ranges[j].end -= mark_count_limit - sum_marks;
/// Удалим вссе последующие отрезки.
new_ranges.erase(new_ranges.begin() + j + 1, new_ranges.end());
}
}
/// Если нашли достаточно строк, дальше можено не смотреть.
if (sum_marks * index_granularity >= count_limit)
break;
}
}
......@@ -933,7 +936,7 @@ BlockInputStreams StorageMergeTree::read(
BlockInputStreams res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names_to_read, max_block_size);
if (sample_by_value)
if (select.sample_size)
{
for (size_t i = 0; i < res.size(); ++i)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册