未验证 提交 19e4a33f 编写于 作者: N Nikolai Kochetov 提交者: GitHub

Merge pull request #19544 from amosbird/limitconcurrency

Per MergeTree table query limit
......@@ -105,6 +105,8 @@ Pipe::Holder & Pipe::Holder::operator=(Holder && rhs)
for (auto & plan : rhs.query_plans)
query_plans.emplace_back(std::move(plan));
query_id_holder = std::move(rhs.query_id_holder);
return *this;
}
......
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/QueryPlan.h>
namespace DB
......@@ -108,6 +109,7 @@ public:
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { holder.query_id_holder = std::move(query_id_holder); }
/// For queries with nested interpreters (i.e. StorageDistributed)
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { holder.query_plans.emplace_back(std::move(plan)); }
......@@ -128,6 +130,7 @@ private:
std::vector<StoragePtr> storage_holders;
std::vector<TableLockHolder> table_locks;
std::vector<std::unique_ptr<QueryPlan>> query_plans;
std::shared_ptr<QueryIdHolder> query_id_holder;
};
Holder holder;
......
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
QueryIdHolder::QueryIdHolder(const String & query_id_, const MergeTreeData & data_) : query_id(query_id_), data(data_)
{
}
QueryIdHolder::~QueryIdHolder()
{
data.removeQueryId(query_id);
}
}
#pragma once
#include <string>
namespace DB
{
class MergeTreeData;
/// Holds the current query id and do something meaningful in destructor.
/// Currently it's used for cleaning query id in the MergeTreeData query set.
struct QueryIdHolder
{
QueryIdHolder(const std::string & query_id_, const MergeTreeData & data_);
~QueryIdHolder();
std::string query_id;
const MergeTreeData & data;
};
}
......@@ -122,6 +122,7 @@ SRCS(
QueryPlan/Optimizations/optimizeTree.cpp
QueryPlan/Optimizations/splitFilter.cpp
QueryPlan/PartialSortingStep.cpp
QueryPlan/QueryIdHolder.cpp
QueryPlan/QueryPlan.cpp
QueryPlan/ReadFromPreparedSource.cpp
QueryPlan/ReadNothingStep.cpp
......
......@@ -114,6 +114,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int SUPPORT_IS_DISABLED;
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
......@@ -3988,4 +3989,24 @@ void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
total_active_size_rows.store(rows, std::memory_order_release);
total_active_size_parts.store(parts, std::memory_order_release);
}
void MergeTreeData::insertQueryIdOrThrow(const String & query_id, size_t max_queries) const
{
std::lock_guard lock(query_id_set_mutex);
if (query_id_set.find(query_id) != query_id_set.end())
return;
if (query_id_set.size() >= max_queries)
throw Exception(
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries for table {}. Maximum is: {}", log_name, max_queries);
query_id_set.insert(query_id);
}
void MergeTreeData::removeQueryId(const String & query_id) const
{
std::lock_guard lock(query_id_set_mutex);
if (query_id_set.find(query_id) == query_id_set.end())
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");
else
query_id_set.erase(query_id);
}
}
......@@ -702,6 +702,12 @@ public:
/// section from config.xml.
CompressionCodecPtr getCompressionCodecForPart(size_t part_size_compressed, const IMergeTreeDataPart::TTLInfos & ttl_infos, time_t current_time) const;
/// Record current query id where querying the table. Throw if there are already `max_queries` queries accessing the same table.
void insertQueryIdOrThrow(const String & query_id, size_t max_queries) const;
/// Remove current query id after query finished.
void removeQueryId(const String & query_id) const;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};
......@@ -958,6 +964,10 @@ private:
std::atomic<size_t> total_active_size_bytes = 0;
std::atomic<size_t> total_active_size_rows = 0;
std::atomic<size_t> total_active_size_parts = 0;
// Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
mutable std::set<String> query_id_set;
mutable std::mutex query_id_set_mutex;
};
}
......@@ -33,6 +33,7 @@
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/MergingFinal.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
......@@ -708,8 +709,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (parts_with_ranges.empty())
return std::make_unique<QueryPlan>();
const auto data_settings = data.getSettings();
auto max_partitions_to_read
= settings.max_partitions_to_read.changed ? settings.max_partitions_to_read : data.getSettings()->max_partitions_to_read;
= settings.max_partitions_to_read.changed ? settings.max_partitions_to_read : data_settings->max_partitions_to_read;
if (max_partitions_to_read > 0)
{
std::set<String> partitions;
......@@ -723,6 +725,18 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
max_partitions_to_read);
}
String query_id;
if (data_settings->max_concurrent_queries > 0)
{
if (data_settings->min_marks_to_honor_max_concurrent_queries > 0
&& sum_marks >= data_settings->min_marks_to_honor_max_concurrent_queries)
{
query_id = context.getCurrentQueryId();
if (!query_id.empty())
data.insertQueryIdOrThrow(query_id, data_settings->max_concurrent_queries);
}
}
ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
......@@ -759,7 +773,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
virt_column_names,
settings,
reader_settings,
result_projection);
result_projection,
query_id);
}
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
{
......@@ -782,7 +797,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
virt_column_names,
settings,
reader_settings,
result_projection);
result_projection,
query_id);
}
else
{
......@@ -796,7 +812,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
query_info,
virt_column_names,
settings,
reader_settings);
reader_settings,
query_id);
}
if (!plan)
......@@ -896,7 +913,7 @@ size_t minMarksForConcurrentRead(
}
static QueryPlanPtr createPlanFromPipe(Pipe pipe, const std::string & description = "")
static QueryPlanPtr createPlanFromPipe(Pipe pipe, const String & query_id, const MergeTreeData & data, const std::string & description = "")
{
auto plan = std::make_unique<QueryPlan>();
......@@ -904,6 +921,10 @@ static QueryPlanPtr createPlanFromPipe(Pipe pipe, const std::string & descriptio
if (!description.empty())
storage_name += ' ' + description;
// Attach QueryIdHolder if needed
if (!query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, data));
auto step = std::make_unique<ReadFromStorageStep>(std::move(pipe), storage_name);
plan->addStep(std::move(step));
return plan;
......@@ -919,7 +940,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
const MergeTreeReaderSettings & reader_settings,
const String & query_id) const
{
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
......@@ -1004,7 +1026,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::move(source));
}
return createPlanFromPipe(Pipe::unitePipes(std::move(res)));
return createPlanFromPipe(Pipe::unitePipes(std::move(res)), query_id, data);
}
else
{
......@@ -1028,7 +1050,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
if (pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
return createPlanFromPipe(std::move(pipe));
return createPlanFromPipe(std::move(pipe), query_id, data);
}
}
......@@ -1052,7 +1074,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const
ActionsDAGPtr & out_projection,
const String & query_id) const
{
size_t sum_marks = 0;
const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
......@@ -1243,7 +1266,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
}
}
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), " with order");
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), query_id, data, " with order");
if (input_order_info->direction != 1)
{
......@@ -1311,7 +1334,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const
ActionsDAGPtr & out_projection,
const String & query_id) const
{
const auto data_settings = data.getSettings();
size_t sum_marks = 0;
......@@ -1407,7 +1431,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
plan = createPlanFromPipe(std::move(pipe), "with final");
plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
}
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
......
......@@ -58,7 +58,8 @@ private:
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const;
const MergeTreeReaderSettings & reader_settings,
const String & query_id) const;
/// out_projection - save projection only with columns, requested to read
QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder(
......@@ -73,7 +74,8 @@ private:
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const;
ActionsDAGPtr & out_projection,
const String & query_id) const;
QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
......@@ -86,7 +88,8 @@ private:
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
ActionsDAGPtr & out_projection) const;
ActionsDAGPtr & out_projection,
const String & query_id) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead(
......
......@@ -111,6 +111,8 @@ struct Settings;
M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \
M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \
M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \
M(UInt64, max_concurrent_queries, 0, "Max number of concurrently executed queries related to the MergeTree table (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
M(UInt64, min_marks_to_honor_max_concurrent_queries, 0, "Minimal number of marks to honor the MergeTree-level's max_concurrent_queries (0 - disabled). Queries will still be limited by other max_concurrent_queries settings.", 0) \
\
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
......
Spin up a long running query
Check if another query with some marks to read is throttled
yes
Check if another query with less marks to read is passed
0 100
Modify min_marks_to_honor_max_concurrent_queries to 1
Check if another query with less marks to read is throttled
yes
Modify max_concurrent_queries to 2
Check if another query is passed
0 100
Modify max_concurrent_queries back to 1
Check if another query with less marks to read is throttled
yes
finished long_running_query default select sleepEachRow(0.01) from simple settings max_block_size = 1 format Null
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT sum(read_rows) FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
${CLICKHOUSE_CLIENT} --multiline --multiquery --query "
drop table if exists simple;
create table simple (i int, j int) engine = MergeTree order by i
settings index_granularity = 1, max_concurrent_queries = 1, min_marks_to_honor_max_concurrent_queries = 2;
insert into simple select number, number + 100 from numbers(1000);
"
echo "Spin up a long running query"
${CLICKHOUSE_CLIENT} --query "select sleepEachRow(0.01) from simple settings max_block_size = 1 format Null" --query_id "long_running_query" > /dev/null 2>&1 &
wait_for_query_to_start 'long_running_query'
# query which reads marks >= min_marks_to_honor_max_concurrent_queries is throttled
echo "Check if another query with some marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
# query which reads marks less than min_marks_to_honor_max_concurrent_queries is allowed
echo "Check if another query with less marks to read is passed"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0"
# We can modify the settings to take effect for future queries
echo "Modify min_marks_to_honor_max_concurrent_queries to 1"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting min_marks_to_honor_max_concurrent_queries = 1"
# Now smaller queries are also throttled
echo "Check if another query with less marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
echo "Modify max_concurrent_queries to 2"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting max_concurrent_queries = 2"
# Now more queries are accepted
echo "Check if another query is passed"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0"
echo "Modify max_concurrent_queries back to 1"
${CLICKHOUSE_CLIENT} --query "alter table simple modify setting max_concurrent_queries = 1"
# Now queries are throttled again
echo "Check if another query with less marks to read is throttled"
${CLICKHOUSE_CLIENT} --query "select * from simple where i = 0" 2> /dev/null;
CODE=$?
[ "$CODE" -ne "202" ] && echo "Expected error code: 202 but got: $CODE" && exit 1;
echo "yes"
${CLICKHOUSE_CLIENT} --query "KILL QUERY WHERE query_id = 'long_running_query' SYNC"
wait
${CLICKHOUSE_CLIENT} --multiline --multiquery --query "
drop table simple
"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册