未验证 提交 d5850557 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #17422 from ClickHouse/backport/20.12/17254

Backport #17254 to 20.12: Fix "Unexpected packet Data received from client"  for Distributed queries
......@@ -151,6 +151,17 @@ void RemoteQueryExecutor::sendQuery()
if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
return;
/// Query cannot be canceled in the middle of the send query,
/// since there are multiple packages:
/// - Query
/// - Data (multiple times)
///
/// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
///
/// Unexpected packet Data received from client
///
std::lock_guard guard(was_cancelled_mutex);
established = true;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
......
......@@ -36,7 +36,8 @@ public:
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res,
Pipes & remote_pipes,
Pipes & delayed_pipes) = 0;
Pipes & delayed_pipes,
Poco::Logger * log) = 0;
};
}
......
......@@ -117,7 +117,8 @@ void SelectStreamFactory::createForShard(
const SelectQueryInfo &,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes)
Pipes & delayed_pipes,
Poco::Logger * log)
{
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
......@@ -143,6 +144,8 @@ void SelectStreamFactory::createForShard(
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
......
......@@ -41,7 +41,8 @@ public:
const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes,
Pipes & delayed_pipes) override;
Pipes & delayed_pipes,
Poco::Logger * log) override;
private:
const Block header;
......
......@@ -119,7 +119,11 @@ void executeQuery(
throttler = user_level_throttler;
for (const auto & shard_info : query_info.cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, plans, remote_pipes, delayed_pipes);
{
stream_factory.createForShard(shard_info, query, query_ast,
new_context, throttler, query_info, plans,
remote_pipes, delayed_pipes, log);
}
if (!remote_pipes.empty())
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册