提交 b41421cb 编写于 作者: R roman

[settings]: introduce new query complexity settings for leaf-nodes

The new setting should allow to control query complexity on leaf nodes
excluding the final merging stage on the root-node. For example, distributed
query that reads 1k rows from 5 shards will breach the `max_rows_to_read=5000`,
while effectively every shard reads only 1k rows. With setting `max_rows_to_read_leaf=1500`
this limit won't be reached and query will succeed since every shard reads
not more that ~1k rows.
上级 97baa67f
......@@ -60,6 +60,31 @@ A maximum number of bytes (uncompressed data) that can be read from a table when
What to do when the volume of data read exceeds one of the limits: ‘throw’ or ‘break’. By default, throw.
## max\_rows\_to\_read_leaf {#max-rows-to-read-leaf}
The following restrictions can be checked on each block (instead of on each row). That is, the restrictions can be broken a little.
A maximum number of rows that can be read from a local table on a leaf node when running a distributed query. While
distributed queries can issue a multiple sub-queries to each shard (leaf) - this limit will be checked only on the read
stage on the leaf nodes and ignored on results merging stage on the root node. For example, cluster consists of 2 shards
and each shard contains a table with 100 rows. Then distributed query which suppose to read all the data from both
tables with setting `max_rows_to_read=150` will fail as in total it will be 200 rows. While query
with `max_rows_to_read_leaf=150` will succeed since leaf nodes will read 100 rows at max.
## max\_bytes\_to\_read_leaf {#max-bytes-to-read-leaf}
A maximum number of bytes (uncompressed data) that can be read from a local table on a leaf node when running
a distributed query. While distributed queries can issue a multiple sub-queries to each shard (leaf) - this limit will
be checked only on the read stage on the leaf nodes and ignored on results merging stage on the root node.
For example, cluster consists of 2 shards and each shard contains a table with 100 bytes of data.
Then distributed query which suppose to read all the data from both tables with setting `max_bytes_to_read=150` will fail
as in total it will be 200 bytes. While query with `max_bytes_to_read_leaf=150` will succeed since leaf nodes will read
100 bytes at max.
## read\_overflow\_mode_leaf {#read-overflow-mode-leaf}
What to do when the volume of data read exceeds one of the leaf limits: ‘throw’ or ‘break’. By default, throw.
## max\_rows\_to\_group\_by {#settings-max-rows-to-group-by}
A maximum number of unique keys received from aggregation. This setting lets you limit memory consumption when aggregating.
......
......@@ -56,6 +56,32 @@
Что делать, когда количество прочитанных данных превысило одно из ограничений: throw или break. По умолчанию: throw.
## max\_rows\_to\_read_leaf {#max-rows-to-read-leaf}
Следующие ограничения могут проверяться на каждый блок (а не на каждую строку). То есть, ограничения могут быть немного нарушены.
Максимальное количество строчек, которое можно прочитать из таблицы на удалённом сервере при выполнении
распределенного запроса. Распределенные запросы могут создавать несколько подзапросов к каждому из шардов в кластере и
тогда этот лимит будет применен при выполнении чтения на удаленных серверах (включая и сервер-инициатор) и проигнорирован
на сервере-инициаторе запроса во время обьединения полученных результатов. Например, кластер состоит из 2 шард и каждый
из них хранит таблицу с 100 строк. Тогда распределнный запрос для получения всех данных из этих таблиц и установленной
настройкой `max_rows_to_read=150` выбросит исключение, т.к. в общем он прочитает 200 строк. Но запрос
с настройкой `max_rows_to_read_leaf=150` завершится успешно, потому что каждый из шардов прочитает максимум 100 строк.
## max\_bytes\_to\_read_leaf {#max-bytes-to-read-leaf}
Максимальное количество байт (несжатых данных), которое можно прочитать из таблицы на удалённом сервере при
выполнении распределенного запроса. Распределенные запросы могут создавать несколько подзапросов к каждому из шардов в
кластере и тогда этот лимит будет применен при выполнении чтения на удаленных серверах (включая и сервер-инициатор)
и проигнорирован на сервере-инициаторе запроса во время обьединения полученных результатов. Например, кластер состоит
из 2 шард и каждый из них хранит таблицу со 100 байтами. Тогда распределнный запрос для получения всех данных из этих таблиц
и установленной настройкой `max_bytes_to_read=150` выбросит исключение, т.к. в общем он прочитает 200 байт. Но запрос
с настройкой `max_bytes_to_read_leaf=150` завершится успешно, потому что каждый из шардов прочитает максимум 100 байт.
## read\_overflow\_mode_leaf {#read-overflow-mode-leaf}
Что делать, когда количество прочитанных данных на удаленном сервере превысило одно из ограничений: throw или break. По умолчанию: throw.
## max\_rows\_to\_group\_by {#settings-max-rows-to-group-by}
Максимальное количество уникальных ключей, получаемых в процессе агрегации. Позволяет ограничить потребление оперативки при агрегации.
......
......@@ -232,6 +232,10 @@ class IColumn;
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node.", 0) \
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node.", 0) \
M(OverflowMode, read_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_group_by, 0, "", 0) \
M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "", 0) \
......
......@@ -1441,16 +1441,21 @@ void InterpreterSelectQuery::executeFetchColumns(
}
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
/// Set the limits and quota for reading data, the speed and time of the query.
if (!options.ignore_limits)
if (!options.ignore_limits) {
limits = getLimitsForStorage(settings, options);
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf,
settings.read_overflow_mode_leaf);
}
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
storage->read(query_plan, table_lock, metadata_snapshot, limits, std::move(quota),
storage->read(query_plan, table_lock, metadata_snapshot, limits, leaf_limits, std::move(quota),
required_columns, query_info, context, processing_stage, max_block_size, max_streams);
}
else
......
......@@ -788,6 +788,15 @@ void Pipe::setLimits(const StreamLocalLimits & limits)
}
}
void Pipe::setLeafLimits(const SizeLimits & leaf_limits)
{
for (auto & processor : processors)
{
if (auto * source_with_progress = dynamic_cast<ISourceWithProgress *>(processor.get()))
source_with_progress->setLeafLimits(leaf_limits);
}
}
void Pipe::setQuota(const std::shared_ptr<const EnabledQuota> & quota)
{
for (auto & processor : processors)
......
......@@ -97,6 +97,7 @@ public:
/// Specify quotas and limits for every ISourceWithProgress.
void setLimits(const StreamLocalLimits & limits);
void setLeafLimits(const SizeLimits & leaf_limits);
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
/// Do not allow to change the table while the processors of pipe are alive.
......
......@@ -15,6 +15,7 @@ ReadFromStorageStep::ReadFromStorageStep(
TableLockHolder table_lock_,
StorageMetadataPtr metadata_snapshot_,
StreamLocalLimits & limits_,
SizeLimits & leaf_limits_,
std::shared_ptr<const EnabledQuota> quota_,
StoragePtr storage_,
const Names & required_columns_,
......@@ -26,6 +27,7 @@ ReadFromStorageStep::ReadFromStorageStep(
: table_lock(std::move(table_lock_))
, metadata_snapshot(std::move(metadata_snapshot_))
, limits(limits_)
, leaf_limits(leaf_limits_)
, quota(std::move(quota_))
, storage(std::move(storage_))
, required_columns(required_columns_)
......@@ -86,6 +88,16 @@ ReadFromStorageStep::ReadFromStorageStep(
pipe.setLimits(limits);
/**
* Leaf size limits should be applied only for local processing of distributed queries.
* Such limits allow to control the read stage on leaf nodes and exclude the merging stage.
* Consider the case when distributed query needs to read from multiple shards. Then leaf
* limits will be applied on the shards only (including the root node) but will be ignored
* on the results merging stage.
*/
if (!storage->isRemote())
pipe.setLeafLimits(leaf_limits);
if (quota)
pipe.setQuota(quota);
......
......@@ -26,6 +26,7 @@ public:
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
SizeLimits & leaf_limits,
std::shared_ptr<const EnabledQuota> quota,
StoragePtr storage,
const Names & required_columns,
......@@ -47,6 +48,7 @@ private:
TableLockHolder table_lock;
StorageMetadataPtr metadata_snapshot;
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
StoragePtr storage;
......
......@@ -33,6 +33,7 @@ public:
/// Implementation for methods from ISourceWithProgress.
void setLimits(const StreamLocalLimits & limits_) final { stream->setLimits(limits_); }
void setLeafLimits(const SizeLimits &) final { }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { stream->setQuota(quota_); }
void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); }
void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
......
......@@ -93,6 +93,12 @@ void SourceWithProgress::progress(const Progress & value)
}
}
if (!leaf_limits.check(rows_to_check_limit, progress.read_bytes,"rows or bytes to read on leaf node",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
{
cancel();
}
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
......
......@@ -17,6 +17,9 @@ public:
/// Set limitations that checked on each chunk.
virtual void setLimits(const StreamLocalLimits & limits_) = 0;
/// Set limitations that checked on each chunk for distributed queries on leaf nodes.
virtual void setLeafLimits(const SizeLimits & leaf_limits_) = 0;
/// Set the quota. If you set a quota on the amount of raw data,
/// then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
virtual void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) = 0;
......@@ -46,6 +49,7 @@ public:
SourceWithProgress(Block header, bool enable_auto_progress);
void setLimits(const StreamLocalLimits & limits_) final { limits = limits_; }
void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { quota = quota_; }
void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; }
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
......@@ -59,6 +63,7 @@ protected:
private:
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
......
......@@ -97,6 +97,7 @@ void IStorage::read(
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
SizeLimits & leaf_limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
......@@ -106,7 +107,7 @@ void IStorage::read(
unsigned num_streams)
{
auto read_step = std::make_unique<ReadFromStorageStep>(
std::move(table_lock), std::move(metadata_snapshot), limits, std::move(quota), shared_from_this(),
std::move(table_lock), std::move(metadata_snapshot), limits, leaf_limits, std::move(quota), shared_from_this(),
column_names, query_info, std::move(context), processed_stage, max_block_size, num_streams);
read_step->setStepDescription("Read from " + getName());
......
......@@ -288,6 +288,7 @@ public:
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
SizeLimits & leaf_limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
......
......@@ -583,6 +583,14 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
{
std::atomic<size_t> total_rows {0};
SizeLimits limits;
/// bytes limit is ignored since we can't check it on this stage
limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode);
SizeLimits leaf_limits;
/// bytes limit is ignored since we can't check it on this stage
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
......@@ -610,18 +618,14 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
if (!ranges.ranges.empty())
{
if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read)
if (settings.read_overflow_mode == OverflowMode::THROW && (limits.max_rows || leaf_limits.max_rows))
{
/// Fail fast if estimated number of rows to read exceeds the limit
auto current_rows_estimate = ranges.getRowsCount();
size_t prev_total_rows_estimate = total_rows.fetch_add(current_rows_estimate);
size_t total_rows_estimate = current_rows_estimate + prev_total_rows_estimate;
if (total_rows_estimate > settings.max_rows_to_read)
throw Exception(
"Limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: "
+ formatReadableQuantity(settings.max_rows_to_read)
+ ", estimated rows to read (at least): " + formatReadableQuantity(total_rows_estimate),
ErrorCodes::TOO_MANY_ROWS);
limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS);
leaf_limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS);
}
parts_with_ranges[part_index] = std::move(ranges);
......
SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=1; -- { serverError 158 }
SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1; -- { serverError 307 }
SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=100;
SELECT count() FROM (SELECT * FROM remote('127.0.0.1', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1000;
SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=1; -- { serverError 158 }
SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1; -- { serverError 307 }
SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_rows_to_read_leaf=100;
SELECT count() FROM (SELECT * FROM remote('127.0.0.2', system.numbers) LIMIT 100) SETTINGS max_bytes_to_read_leaf=1000;
DROP TABLE IF EXISTS test_local;
DROP TABLE IF EXISTS test_distributed;
CREATE TABLE test_local (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE test_distributed AS test_local ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), test_local, rand());
INSERT INTO test_local SELECT '2000-08-01', number as value from numbers(50000);
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read_leaf = 40000; -- { serverError 158 }
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read_leaf = 40000; -- { serverError 307 }
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read = 60000; -- { serverError 158 }
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_rows_to_read_leaf = 60000;
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read = 100000; -- { serverError 307 }
SELECT count() FROM (SELECT * FROM test_distributed) SETTINGS max_bytes_to_read_leaf = 100000;
DROP TABLE IF EXISTS test_local;
DROP TABLE IF EXISTS test_distributed;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册