From b41421cb1cdc6c3f86ef46f27521612621eef3a3 Mon Sep 17 00:00:00 2001 From: roman Date: Fri, 28 Aug 2020 15:50:25 +0100 Subject: [PATCH] [settings]: introduce new query complexity settings for leaf-nodes The new setting should allow to control query complexity on leaf nodes excluding the final merging stage on the root-node. For example, distributed query that reads 1k rows from 5 shards will breach the `max_rows_to_read=5000`, while effectively every shard reads only 1k rows. With setting `max_rows_to_read_leaf=1500` this limit won't be reached and query will succeed since every shard reads not more that ~1k rows. --- .../operations/settings/query-complexity.md | 25 ++++++++++++++++ .../operations/settings/query-complexity.md | 26 +++++++++++++++++ src/Core/Settings.h | 4 +++ src/Interpreters/InterpreterSelectQuery.cpp | 9 ++++-- src/Processors/Pipe.cpp | 9 ++++++ src/Processors/Pipe.h | 1 + .../QueryPlan/ReadFromStorageStep.cpp | 12 ++++++++ .../QueryPlan/ReadFromStorageStep.h | 2 ++ .../Sources/SourceFromInputStream.h | 1 + src/Processors/Sources/SourceWithProgress.cpp | 6 ++++ src/Processors/Sources/SourceWithProgress.h | 5 ++++ src/Storages/IStorage.cpp | 3 +- src/Storages/IStorage.h | 1 + .../MergeTree/MergeTreeDataSelectExecutor.cpp | 18 +++++++----- ...hard_leaf_max_rows_bytes_to_read.reference | 6 ++++ ...1455_shard_leaf_max_rows_bytes_to_read.sql | 29 +++++++++++++++++++ 16 files changed, 147 insertions(+), 10 deletions(-) create mode 100644 tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.reference create mode 100755 tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.sql diff --git a/docs/en/operations/settings/query-complexity.md b/docs/en/operations/settings/query-complexity.md index 0486392d25..f803e694eb 100644 --- a/docs/en/operations/settings/query-complexity.md +++ b/docs/en/operations/settings/query-complexity.md @@ -60,6 +60,31 @@ A maximum number of bytes (uncompressed data) that can be read from a table when What to do when the volume of data read exceeds one of the limits: ‘throw’ or ‘break’. By default, throw. +## max\_rows\_to\_read_leaf {#max-rows-to-read-leaf} + +The following restrictions can be checked on each block (instead of on each row). That is, the restrictions can be broken a little. + +A maximum number of rows that can be read from a local table on a leaf node when running a distributed query. While +distributed queries can issue a multiple sub-queries to each shard (leaf) - this limit will be checked only on the read +stage on the leaf nodes and ignored on results merging stage on the root node. For example, cluster consists of 2 shards +and each shard contains a table with 100 rows. Then distributed query which suppose to read all the data from both +tables with setting `max_rows_to_read=150` will fail as in total it will be 200 rows. While query +with `max_rows_to_read_leaf=150` will succeed since leaf nodes will read 100 rows at max. + +## max\_bytes\_to\_read_leaf {#max-bytes-to-read-leaf} + +A maximum number of bytes (uncompressed data) that can be read from a local table on a leaf node when running +a distributed query. While distributed queries can issue a multiple sub-queries to each shard (leaf) - this limit will +be checked only on the read stage on the leaf nodes and ignored on results merging stage on the root node. +For example, cluster consists of 2 shards and each shard contains a table with 100 bytes of data. +Then distributed query which suppose to read all the data from both tables with setting `max_bytes_to_read=150` will fail +as in total it will be 200 bytes. While query with `max_bytes_to_read_leaf=150` will succeed since leaf nodes will read +100 bytes at max. + +## read\_overflow\_mode_leaf {#read-overflow-mode-leaf} + +What to do when the volume of data read exceeds one of the leaf limits: ‘throw’ or ‘break’. By default, throw. + ## max\_rows\_to\_group\_by {#settings-max-rows-to-group-by} A maximum number of unique keys received from aggregation. This setting lets you limit memory consumption when aggregating. diff --git a/docs/ru/operations/settings/query-complexity.md b/docs/ru/operations/settings/query-complexity.md index 74c99968bc..d228732acd 100644 --- a/docs/ru/operations/settings/query-complexity.md +++ b/docs/ru/operations/settings/query-complexity.md @@ -56,6 +56,32 @@ Что делать, когда количество прочитанных данных превысило одно из ограничений: throw или break. По умолчанию: throw. +## max\_rows\_to\_read_leaf {#max-rows-to-read-leaf} + +Следующие ограничения могут проверяться на каждый блок (а не на каждую строку). То есть, ограничения могут быть немного нарушены. + +Максимальное количество строчек, которое можно прочитать из таблицы на удалённом сервере при выполнении +распределенного запроса. Распределенные запросы могут создавать несколько подзапросов к каждому из шардов в кластере и +тогда этот лимит будет применен при выполнении чтения на удаленных серверах (включая и сервер-инициатор) и проигнорирован +на сервере-инициаторе запроса во время обьединения полученных результатов. Например, кластер состоит из 2 шард и каждый +из них хранит таблицу с 100 строк. Тогда распределнный запрос для получения всех данных из этих таблиц и установленной +настройкой `max_rows_to_read=150` выбросит исключение, т.к. в общем он прочитает 200 строк. Но запрос +с настройкой `max_rows_to_read_leaf=150` завершится успешно, потому что каждый из шардов прочитает максимум 100 строк. + +## max\_bytes\_to\_read_leaf {#max-bytes-to-read-leaf} + +Максимальное количество байт (несжатых данных), которое можно прочитать из таблицы на удалённом сервере при +выполнении распределенного запроса. Распределенные запросы могут создавать несколько подзапросов к каждому из шардов в +кластере и тогда этот лимит будет применен при выполнении чтения на удаленных серверах (включая и сервер-инициатор) +и проигнорирован на сервере-инициаторе запроса во время обьединения полученных результатов. Например, кластер состоит +из 2 шард и каждый из них хранит таблицу со 100 байтами. Тогда распределнный запрос для получения всех данных из этих таблиц +и установленной настройкой `max_bytes_to_read=150` выбросит исключение, т.к. в общем он прочитает 200 байт. Но запрос +с настройкой `max_bytes_to_read_leaf=150` завершится успешно, потому что каждый из шардов прочитает максимум 100 байт. + +## read\_overflow\_mode_leaf {#read-overflow-mode-leaf} + +Что делать, когда количество прочитанных данных на удаленном сервере превысило одно из ограничений: throw или break. По умолчанию: throw. + ## max\_rows\_to\_group\_by {#settings-max-rows-to-group-by} Максимальное количество уникальных ключей, получаемых в процессе агрегации. Позволяет ограничить потребление оперативки при агрегации. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index b39c223a5e..bf1a44670b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -232,6 +232,10 @@ class IColumn; M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \ M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ \ + M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node.", 0) \ + M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node.", 0) \ + M(OverflowMode, read_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \ + \ M(UInt64, max_rows_to_group_by, 0, "", 0) \ M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ M(UInt64, max_bytes_before_external_group_by, 0, "", 0) \ diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 887f4795bc..3cbbdb576b 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1441,16 +1441,21 @@ void InterpreterSelectQuery::executeFetchColumns( } StreamLocalLimits limits; + SizeLimits leaf_limits; std::shared_ptr quota; + /// Set the limits and quota for reading data, the speed and time of the query. - if (!options.ignore_limits) + if (!options.ignore_limits) { limits = getLimitsForStorage(settings, options); + leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf, + settings.read_overflow_mode_leaf); + } if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete)) quota = context->getQuota(); - storage->read(query_plan, table_lock, metadata_snapshot, limits, std::move(quota), + storage->read(query_plan, table_lock, metadata_snapshot, limits, leaf_limits, std::move(quota), required_columns, query_info, context, processing_stage, max_block_size, max_streams); } else diff --git a/src/Processors/Pipe.cpp b/src/Processors/Pipe.cpp index 90a8a65ff2..9e9c9cab38 100644 --- a/src/Processors/Pipe.cpp +++ b/src/Processors/Pipe.cpp @@ -788,6 +788,15 @@ void Pipe::setLimits(const StreamLocalLimits & limits) } } +void Pipe::setLeafLimits(const SizeLimits & leaf_limits) +{ + for (auto & processor : processors) + { + if (auto * source_with_progress = dynamic_cast(processor.get())) + source_with_progress->setLeafLimits(leaf_limits); + } +} + void Pipe::setQuota(const std::shared_ptr & quota) { for (auto & processor : processors) diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index 4adb529bb1..f674663154 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -97,6 +97,7 @@ public: /// Specify quotas and limits for every ISourceWithProgress. void setLimits(const StreamLocalLimits & limits); + void setLeafLimits(const SizeLimits & leaf_limits); void setQuota(const std::shared_ptr & quota); /// Do not allow to change the table while the processors of pipe are alive. diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.cpp b/src/Processors/QueryPlan/ReadFromStorageStep.cpp index 2f305e7220..b085c177ad 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromStorageStep.cpp @@ -15,6 +15,7 @@ ReadFromStorageStep::ReadFromStorageStep( TableLockHolder table_lock_, StorageMetadataPtr metadata_snapshot_, StreamLocalLimits & limits_, + SizeLimits & leaf_limits_, std::shared_ptr quota_, StoragePtr storage_, const Names & required_columns_, @@ -26,6 +27,7 @@ ReadFromStorageStep::ReadFromStorageStep( : table_lock(std::move(table_lock_)) , metadata_snapshot(std::move(metadata_snapshot_)) , limits(limits_) + , leaf_limits(leaf_limits_) , quota(std::move(quota_)) , storage(std::move(storage_)) , required_columns(required_columns_) @@ -86,6 +88,16 @@ ReadFromStorageStep::ReadFromStorageStep( pipe.setLimits(limits); + /** + * Leaf size limits should be applied only for local processing of distributed queries. + * Such limits allow to control the read stage on leaf nodes and exclude the merging stage. + * Consider the case when distributed query needs to read from multiple shards. Then leaf + * limits will be applied on the shards only (including the root node) but will be ignored + * on the results merging stage. + */ + if (!storage->isRemote()) + pipe.setLeafLimits(leaf_limits); + if (quota) pipe.setQuota(quota); diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.h b/src/Processors/QueryPlan/ReadFromStorageStep.h index 9c2b9e5645..98cde63a86 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromStorageStep.h @@ -26,6 +26,7 @@ public: TableLockHolder table_lock, StorageMetadataPtr metadata_snapshot, StreamLocalLimits & limits, + SizeLimits & leaf_limits, std::shared_ptr quota, StoragePtr storage, const Names & required_columns, @@ -47,6 +48,7 @@ private: TableLockHolder table_lock; StorageMetadataPtr metadata_snapshot; StreamLocalLimits limits; + SizeLimits leaf_limits; std::shared_ptr quota; StoragePtr storage; diff --git a/src/Processors/Sources/SourceFromInputStream.h b/src/Processors/Sources/SourceFromInputStream.h index 630c712dae..2e8cf00762 100644 --- a/src/Processors/Sources/SourceFromInputStream.h +++ b/src/Processors/Sources/SourceFromInputStream.h @@ -33,6 +33,7 @@ public: /// Implementation for methods from ISourceWithProgress. void setLimits(const StreamLocalLimits & limits_) final { stream->setLimits(limits_); } + void setLeafLimits(const SizeLimits &) final { } void setQuota(const std::shared_ptr & quota_) final { stream->setQuota(quota_); } void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); } void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); } diff --git a/src/Processors/Sources/SourceWithProgress.cpp b/src/Processors/Sources/SourceWithProgress.cpp index d6972f9936..e3dd614eec 100644 --- a/src/Processors/Sources/SourceWithProgress.cpp +++ b/src/Processors/Sources/SourceWithProgress.cpp @@ -93,6 +93,12 @@ void SourceWithProgress::progress(const Progress & value) } } + if (!leaf_limits.check(rows_to_check_limit, progress.read_bytes,"rows or bytes to read on leaf node", + ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) + { + cancel(); + } + size_t total_rows = progress.total_rows_to_read; constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds diff --git a/src/Processors/Sources/SourceWithProgress.h b/src/Processors/Sources/SourceWithProgress.h index fdab345548..3aa7a81f41 100644 --- a/src/Processors/Sources/SourceWithProgress.h +++ b/src/Processors/Sources/SourceWithProgress.h @@ -17,6 +17,9 @@ public: /// Set limitations that checked on each chunk. virtual void setLimits(const StreamLocalLimits & limits_) = 0; + /// Set limitations that checked on each chunk for distributed queries on leaf nodes. + virtual void setLeafLimits(const SizeLimits & leaf_limits_) = 0; + /// Set the quota. If you set a quota on the amount of raw data, /// then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits. virtual void setQuota(const std::shared_ptr & quota_) = 0; @@ -46,6 +49,7 @@ public: SourceWithProgress(Block header, bool enable_auto_progress); void setLimits(const StreamLocalLimits & limits_) final { limits = limits_; } + void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; } void setQuota(const std::shared_ptr & quota_) final { quota = quota_; } void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; } void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; } @@ -59,6 +63,7 @@ protected: private: StreamLocalLimits limits; + SizeLimits leaf_limits; std::shared_ptr quota; ProgressCallback progress_callback; QueryStatus * process_list_elem = nullptr; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 0711d32d80..50b36ced19 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -97,6 +97,7 @@ void IStorage::read( TableLockHolder table_lock, StorageMetadataPtr metadata_snapshot, StreamLocalLimits & limits, + SizeLimits & leaf_limits, std::shared_ptr quota, const Names & column_names, const SelectQueryInfo & query_info, @@ -106,7 +107,7 @@ void IStorage::read( unsigned num_streams) { auto read_step = std::make_unique( - std::move(table_lock), std::move(metadata_snapshot), limits, std::move(quota), shared_from_this(), + std::move(table_lock), std::move(metadata_snapshot), limits, leaf_limits, std::move(quota), shared_from_this(), column_names, query_info, std::move(context), processed_stage, max_block_size, num_streams); read_step->setStepDescription("Read from " + getName()); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index dc7c684d5b..dbd18c9558 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -288,6 +288,7 @@ public: TableLockHolder table_lock, StorageMetadataPtr metadata_snapshot, StreamLocalLimits & limits, + SizeLimits & leaf_limits, std::shared_ptr quota, const Names & column_names, const SelectQueryInfo & query_info, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index e780ebda11..4773652152 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -583,6 +583,14 @@ Pipe MergeTreeDataSelectExecutor::readFromParts( { std::atomic total_rows {0}; + SizeLimits limits; + /// bytes limit is ignored since we can't check it on this stage + limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode); + + SizeLimits leaf_limits; + /// bytes limit is ignored since we can't check it on this stage + leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf); + auto process_part = [&](size_t part_index) { auto & part = parts[part_index]; @@ -610,18 +618,14 @@ Pipe MergeTreeDataSelectExecutor::readFromParts( if (!ranges.ranges.empty()) { - if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read) + if (settings.read_overflow_mode == OverflowMode::THROW && (limits.max_rows || leaf_limits.max_rows)) { /// Fail fast if estimated number of rows to read exceeds the limit auto current_rows_estimate = ranges.getRowsCount(); size_t prev_total_rows_estimate = total_rows.fetch_add(current_rows_estimate); size_t total_rows_estimate = current_rows_estimate + prev_total_rows_estimate; - if (total_rows_estimate > settings.max_rows_to_read) - throw Exception( - "Limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: " - + formatReadableQuantity(settings.max_rows_to_read) - + ", estimated rows to read (at least): " + formatReadableQuantity(total_rows_estimate), - ErrorCodes::TOO_MANY_ROWS); + limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS); + leaf_limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS); } parts_with_ranges[part_index] = std::move(ranges); diff --git a/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.reference b/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.reference new file mode 100644 index 0000000000..cccfb12c95 --- /dev/null +++ b/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.reference @@ -0,0 +1,6 @@ +100 +100 +100 +100 +100000 +100000 diff --git a/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.sql b/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.sql new file mode 100755 index 0000000000..fca5c4534f --- /dev/null +++ b/tests/queries/0_stateless/01455_shard_leaf_max_rows_bytes_to_read.sql @@ -0,0 +1,29 @@ +SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=1; -- { serverError 158 } +SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1; -- { serverError 307 } +SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=100; +SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1000; + +SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=1; -- { serverError 158 } +SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1; -- { serverError 307 } +SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=100; +SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1000; + +DROP TABLE IF EXISTS test_local; +DROP TABLE IF EXISTS test_distributed; + +CREATE TABLE test_local (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192); +CREATE TABLE test_distributed AS test_local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), test_local, rand()); + +INSERT INTO test_local SELECT '2000-08-01', number as value from numbers(50000); + +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read_leaf = 40000; -- { serverError 158 } +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read_leaf = 40000; -- { serverError 307 } + +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read = 60000; -- { serverError 158 } +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read_leaf = 60000; + +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read = 100000; -- { serverError 307 } +SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read_leaf = 100000; + +DROP TABLE IF EXISTS test_local; +DROP TABLE IF EXISTS test_distributed; \ No newline at end of file -- GitLab