diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 2951a4f179c9bad91eb1b4742b2903c1c7130986..ecd8762585472791b06309016a617380826c9d3f 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -767,39 +767,43 @@ using ManyAggregatedDataVariants = std::vector; */ -/** Агрегирует источник блоков. +/** Aggregates stream of blocks. */ class Aggregator { public: struct Params { - /// Что считать. + /// What to calculate. Names key_names; - ColumnNumbers keys; /// Номера столбцов - вычисляются позже. + ColumnNumbers keys; /// Column numbers calculated later. AggregateDescriptions aggregates; size_t keys_size; size_t aggregates_size; - /// Настройки приближённого вычисления GROUP BY. - const bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by. + /// Settings of approximate calculation of GROUP BY. + /// Should accumulate aggregates for keys that wasn't in first 'max_rows_to_group_by' into AggregatedDataVariants::without_key. + const bool overflow_row; const size_t max_rows_to_group_by; const OverflowMode group_by_overflow_mode; - /// Для динамической компиляции. + /// Should return empty result instead single row for queries like SELECT count() FROM empty_table. + bool empty_result_for_empty_data = false; + + /// For runtime compilation. Compiler * compiler; const UInt32 min_count_to_compile; - /// Настройки двухуровневой агрегации (используется для большого количества ключей). - /** При каком количестве ключей или размере состояния агрегации в байтах, - * начинает использоваться двухуровневая агрегация. Достаточно срабатывания хотя бы одного из порогов. - * 0 - соответствующий порог не задан. + /// Settings for two-level aggregation (used in case of large amount of values of keys). + /** At what number of keys OR size of aggregation state in bytes, + * start to use two-level aggregation. + * 0 means threshold is not set. */ const size_t group_by_two_level_threshold; const size_t group_by_two_level_threshold_bytes; - /// Настройки для сброса временных данных в файловую систему (внешняя агрегация). - const size_t max_bytes_before_external_group_by; /// 0 - не использовать внешнюю агрегацию. + /// Settings for storing temporary data in filesystem (aggregation in external memory). + const size_t max_bytes_before_external_group_by; /// 0 - don't use aggregation in external memory. const std::string tmp_path; Params( @@ -807,9 +811,11 @@ public: bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, - size_t max_bytes_before_external_group_by_, const std::string & tmp_path_) + size_t max_bytes_before_external_group_by_, const std::string & tmp_path_, + bool empty_result_for_empty_data_) : key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()), overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), + empty_result_for_empty_data(empty_result_for_empty_data_), compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_) @@ -819,11 +825,11 @@ public: keys_size = key_names.size(); } - /// Только параметры, имеющие значение при мердже. - Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_) - : Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {} + /// Only params meaningful for merging states. + Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, bool empty_result_for_empty_data_) + : Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "", empty_result_for_empty_data_) {} - /// Вычислить номера столбцов в keys и aggregates. + /// Calculate column numbers from its names into 'keys' and 'aggregates'. void calculateColumnNumbers(const Block & block); }; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 17cd528d19a5e38b4089c959280065564a6eabbb..7ba58206d6c735535f70cd530951df44ca6122c2 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -217,6 +217,9 @@ struct Settings \ /** What aggregate function to use for implementation of count(DISTINCT ...) */ \ M(SettingString, count_distinct_implementation, "uniq") \ + \ + /** When aggregating without keys (without GROUP BY), return empty result for empty data instead of single row with default values. */ \ + M(SettingBool, return_empty_result_when_aggregating_empty_data_without_keys, 0) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index 8b143f9be384ba245017cbaba45df8955be2f886..ffae91402293978cb2441dd88c40c906905cbf43 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -57,10 +57,13 @@ Block AggregatingBlockInputStream::readImpl() } Block res; + std::cerr << (isCancelled() || !impl) << ", " << impl->getName() << "\n"; if (isCancelled() || !impl) return res; - return impl->read(); + res = impl->read(); + std::cerr << res.dumpStructure() << "\n"; + return res; } diff --git a/dbms/src/DataStreams/tests/aggregating_stream.cpp b/dbms/src/DataStreams/tests/aggregating_stream.cpp index fc88b1ebc5956c8570c8a096db2dfd393318c25c..f40a09a7499c428d5d2815f595c7e414e50960a0 100644 --- a/dbms/src/DataStreams/tests/aggregating_stream.cpp +++ b/dbms/src/DataStreams/tests/aggregating_stream.cpp @@ -89,7 +89,7 @@ int main(int argc, char ** argv) sample.insert(std::move(col)); } - Aggregator::Params params(key_column_names, aggregate_descriptions, false); + Aggregator::Params params(key_column_names, aggregate_descriptions, false, false); BlockInputStreamPtr stream = std::make_shared(block); stream = std::make_shared(stream, params, true); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 09763b5acbace694946f7dc0fe19d23af9623464..96bc5cce0109f764328282435e6459c6045ebfeb 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -1248,6 +1249,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( } +static Block createResultWithDefaultValues(const Block & sample) +{ + Block res = sample.cloneEmpty(); + + std::cerr << res.dumpStructure() << ", " << sample.dumpStructure() << "\n"; + + /// Insert default value for each column. + size_t columns = res.columns(); + for (size_t i = 0; i < columns; ++i) + res.unsafeGetByPosition(i).column->insertDefault(); + + return res; +} + + BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const { if (isCancelled()) @@ -1261,7 +1277,16 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b /// В какой структуре данных агрегированы данные? if (data_variants.empty()) + { + /// If aggregate without keys, in case of empty data, we must return result with one row contains default states of aggregate functions. + /// For example, "SELECT count() FROM empty_table" should return one row with 0 value. + if (!params.empty_result_for_empty_data && params.keys_size == 0) + { + blocks.push_back(createResultWithDefaultValues(sample)); + } + return blocks; + } std::unique_ptr thread_pool; if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Сделать настраиваемый порог. @@ -1684,7 +1709,18 @@ std::unique_ptr Aggregator::mergeAndConvertToBlocks( non_empty_data.push_back(data); if (non_empty_data.empty()) - return std::make_unique(); + { + std::cerr << params.empty_result_for_empty_data << ", " << params.keys_size << "\n"; + + /// If aggregate without keys, in case of empty data, we must return result with one row contains default states of aggregate functions. + /// For example, "SELECT count() FROM empty_table" should return one row with 0 value. + if (!params.empty_result_for_empty_data && params.keys_size == 0) + { + return std::make_unique(createResultWithDefaultValues(sample)); + } + else + return std::make_unique(); + } if (non_empty_data.size() > 1) { diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index b82ac5cb323b43078b6b484358535dfe92ceb7be..785081fa813ffebfef7747f4f6719c08546bdab6 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -892,7 +892,8 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath()); + settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath(), + settings.return_empty_result_when_aggregating_empty_data_without_keys); /// Если источников несколько, то выполняем параллельную агрегацию if (streams.size() > 1) @@ -946,7 +947,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina * но при этом может работать медленнее. */ - Aggregator::Params params(key_names, aggregates, overflow_row); + Aggregator::Params params(key_names, aggregates, overflow_row, settings.return_empty_result_when_aggregating_empty_data_without_keys); if (!settings.distributed_aggregation_memory_efficient) { diff --git a/dbms/src/Interpreters/tests/aggregate.cpp b/dbms/src/Interpreters/tests/aggregate.cpp index d10473c67585dc6049076cc740c82a3238993eaf..83293dc8d09a4524a9d39e2f8146a8332500943d 100644 --- a/dbms/src/Interpreters/tests/aggregate.cpp +++ b/dbms/src/Interpreters/tests/aggregate.cpp @@ -73,7 +73,7 @@ int main(int argc, char ** argv) DataTypes empty_list_of_types; aggregate_descriptions[0].function = factory.get("count", empty_list_of_types); - Aggregator::Params params(key_column_names, aggregate_descriptions, false); + Aggregator::Params params(key_column_names, aggregate_descriptions, false, false); Aggregator aggregator(params); {