未验证 提交 9a2760d9 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #9673 from azat/processors-max_distributed_connections

Fix max_distributed_connections
......@@ -265,7 +265,7 @@ void TCPHandler::runImpl()
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
processOrdinaryQueryWithProcessors();
else
processOrdinaryQuery();
......@@ -544,13 +544,10 @@ void TCPHandler::processOrdinaryQuery()
}
void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
void TCPHandler::processOrdinaryQueryWithProcessors()
{
auto & pipeline = state.io.pipeline;
/// Reduce the number of threads to recommended value.
num_threads = std::min(num_threads, pipeline.getNumThreads());
/// Send header-block, to allow client to prepare output format for data to send.
{
auto & header = pipeline.getHeader();
......@@ -585,7 +582,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
try
{
executor->execute(num_threads);
executor->execute(pipeline.getNumThreads());
}
catch (...)
{
......
......@@ -171,7 +171,7 @@ private:
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
void processOrdinaryQueryWithProcessors(size_t num_threads);
void processOrdinaryQueryWithProcessors();
void processTablesStatusRequest();
......
......@@ -467,7 +467,6 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
{
QueryPipeline query_pipeline;
executeImpl(query_pipeline, input, std::move(input_pipe), query_pipeline);
query_pipeline.setMaxThreads(max_streams);
query_pipeline.addInterpreterContext(context);
query_pipeline.addStorageHolder(storage);
return query_pipeline;
......@@ -1301,6 +1300,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
is_remote = true;
max_streams = settings.max_distributed_connections;
pipeline.setMaxThreads(max_streams);
}
UInt64 max_block_size = settings.max_block_size;
......@@ -1325,6 +1325,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
max_streams = 1;
pipeline.setMaxThreads(max_streams);
}
if (!max_block_size)
......
......@@ -92,6 +92,8 @@ public:
ASTPtr getQuery() const { return query_ptr; }
size_t getMaxStreams() const { return max_streams; }
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
......@@ -122,6 +124,9 @@ private:
BlockInputStreamPtr stream_with_non_joined_data;
bool union_stream = false;
/// Cache value of InterpreterSelectQuery::max_streams
size_t max_threads = 1;
BlockInputStreamPtr & firstStream() { return streams.at(0); }
template <typename Transform>
......@@ -147,6 +152,10 @@ private:
bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
bool initialized() const { return !streams.empty(); }
/// Compatibility with QueryPipeline (Processors)
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getNumThreads() const { return max_threads; }
};
template <typename TPipeline>
......
......@@ -34,7 +34,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const Names & required_result_column_names)
: options(options_),
query_ptr(query_ptr_),
context(std::make_shared<Context>(context_))
context(std::make_shared<Context>(context_)),
max_streams(context->getSettingsRef().max_threads)
{
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
......@@ -196,14 +197,23 @@ BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(Qu
parent_pipeline.addInterpreterContext(context);
}
/// Update max_streams due to:
/// - max_distributed_connections for Distributed() engine
/// - max_streams_to_max_threads_ratio
///
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
/// number of streams, which is empty for non-Processors case.
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
{
return a->getMaxStreams() < b->getMaxStreams();
}))->getMaxStreams();
return nested_streams;
}
BlockIO InterpreterSelectWithUnionQuery::execute()
{
const Settings & settings = context->getSettingsRef();
BlockIO res;
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
BlockInputStreamPtr result_stream;
......@@ -219,7 +229,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
}
else
{
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads);
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
nested_streams.clear();
}
......
......@@ -56,6 +56,8 @@ private:
Block result_header;
size_t max_streams = 1;
static Block getCommonHeaderForUnion(const Blocks & headers);
};
......
......@@ -751,7 +751,7 @@ void executeQuery(
{
auto executor = pipeline.execute();
executor->execute(context.getSettingsRef().max_threads);
executor->execute(pipeline.getNumThreads());
}
}
}
......
......@@ -427,7 +427,7 @@ Pipes StorageDistributed::read(
if (!smaller_cluster)
{
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
(has_sharding_key ? "" : "(no sharding key)") << ": "
(has_sharding_key ? "" : " (no sharding key)") << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
"the query will be sent to all shards of the cluster");
......
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
opts=(
--max_distributed_connections 2
--max_threads 1
--query "SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
)
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
# "$@" left to pass manual options (like --experimental_use_processors 0) during manual testing
timeout 1.8s $CLICKHOUSE_CLIENT "${opts[@]}" "$@"
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
query="SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
timeout 1.8s ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_distributed_connections=2&max_threads=1" -d "$query"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册