From 93143369ce58750a77a3cf4402de7406670e430c Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Thu, 13 Dec 2012 09:32:08 +0000 Subject: [PATCH] clickhouse: added support for non-UInt32 sampling columns [#6201]. --- dbms/src/Storages/StorageMergeTree.cpp | 73 ++++++++++++++------------ 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 9c4c1be70b..7bea70a380 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -825,9 +825,7 @@ BlockInputStreams StorageMergeTree::read( /// Семплирование. Names column_names_to_read = column_names_to_return; - size_t count_limit = std::numeric_limits::max(); - bool sample_by_value = false; - UInt64 sample_column_value_limit = 0; + UInt64 sampling_column_value_limit = 0; typedef Poco::SharedPtr ASTFunctionPtr; ASTFunctionPtr filter_function; ExpressionPtr filter_expression; @@ -835,31 +833,51 @@ BlockInputStreams StorageMergeTree::read( ASTSelectQuery & select = *dynamic_cast(&*query); if (select.sample_size) { - double size = boost::apply_visitor(FieldVisitorConvertToNumber(), dynamic_cast(&*select.sample_size)->value); + double size = boost::apply_visitor(FieldVisitorConvertToNumber(), dynamic_cast(*select.sample_size).value); if (size < 0) throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND); if (size > 1) { - count_limit = boost::apply_visitor(FieldVisitorConvertToNumber(), dynamic_cast(&*select.sample_size)->value); + size_t requested_count = boost::apply_visitor(FieldVisitorConvertToNumber(), dynamic_cast(*select.sample_size).value); + + /// Узнаем количество строк в таблице. + size_t total_count = 0; + { + Poco::ScopedLock lock(data_parts_mutex); + + for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) + total_count += (*it)->size; + } + total_count *= index_granularity; + + size = std::min(1., static_cast(requested_count) / total_count); } + + UInt64 sampling_column_max = 0; + DataTypePtr type = Expression(sampling_expression, context).getReturnTypes()[0]; + + if (type->getName() == "UInt64") + sampling_column_max = std::numeric_limits::max(); + else if (type->getName() == "UInt32") + sampling_column_max = std::numeric_limits::max(); + else if (type->getName() == "UInt16") + sampling_column_max = std::numeric_limits::max(); + else if (type->getName() == "UInt8") + sampling_column_max = std::numeric_limits::max(); else - { - sample_by_value = true; - sample_column_value_limit = static_cast(size * std::numeric_limits::max()); - if (!key_condition.addCondition(sampling_expression->getColumnName(), - Range::RightBounded(sample_column_value_limit, true))) - throw Exception("Invalid sampling column in storage parameters", ErrorCodes::ILLEGAL_COLUMN); - } - } - - if (sample_by_value) - { - /// Выражение для фильтрации: sampling_column_name <= sample_column_value_limit + throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); + + sampling_column_value_limit = static_cast(size * sampling_column_max); + if (!key_condition.addCondition(sampling_expression->getColumnName(), + Range::RightBounded(sampling_column_value_limit, true))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + + /// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit ASTPtr filter_function_args = new ASTExpressionList; filter_function_args->children.push_back(sampling_expression); - filter_function_args->children.push_back(new ASTLiteral(StringRange(), sample_column_value_limit)); - + filter_function_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_limit)); + filter_function = new ASTFunction; filter_function->name = "lessOrEquals"; filter_function->arguments = filter_function_args; @@ -891,7 +909,6 @@ BlockInputStreams StorageMergeTree::read( RangesInDataParts parts_with_ranges; /// Найдем, какой диапазон читать из каждого куска. - size_t mark_count_limit = (count_limit - 1) / index_granularity + 1; size_t sum_marks = 0; size_t sum_ranges = 0; for (size_t i = 0; i < parts.size(); ++i) @@ -910,21 +927,7 @@ BlockInputStreams StorageMergeTree::read( for (size_t j = 0; j < ranges.ranges.size(); ++j) { sum_marks += ranges.ranges[j].end - ranges.ranges[j].begin; - - /// Если нашли достаточно строк. - if (sum_marks >= mark_count_limit) - { - MarkRanges & new_ranges = parts_with_ranges.back().ranges; - /// Обрежем этот отрезок. - new_ranges[j].end -= mark_count_limit - sum_marks; - /// Удалим вссе последующие отрезки. - new_ranges.erase(new_ranges.begin() + j + 1, new_ranges.end()); - } } - - /// Если нашли достаточно строк, дальше можено не смотреть. - if (sum_marks * index_granularity >= count_limit) - break; } } @@ -933,7 +936,7 @@ BlockInputStreams StorageMergeTree::read( BlockInputStreams res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names_to_read, max_block_size); - if (sample_by_value) + if (select.sample_size) { for (size_t i = 0; i < res.size(); ++i) { -- GitLab