提交 587cde85 编写于 作者: A Azat Khuzhin

Avoid skipping unused shards twice (for query processing stage and read itself)

上级 77c9f7af
......@@ -5,6 +5,7 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InternalTextLogsQueue.h>
......@@ -314,6 +315,8 @@ void RemoteQueryExecutor::sendScalars()
void RemoteQueryExecutor::sendExternalTables()
{
SelectQueryInfo query_info;
size_t count = multiplexed_connections->size();
{
......@@ -328,7 +331,8 @@ void RemoteQueryExecutor::sendExternalTables()
{
StoragePtr cur = table.second;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
......
......@@ -4,9 +4,10 @@
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <Interpreters/ProcessList.h>
#include <Parsers/queryToString.h>
#include <Processors/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
......@@ -75,12 +76,13 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
}
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info)
IStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const SelectQueryInfo & query_info)
{
assert(log);
Pipes res;
const Settings & settings = context.getSettingsRef();
const std::string query = queryToString(query_ast);
......@@ -103,7 +105,7 @@ Pipe executeQuery(
else
throttler = user_level_throttler;
for (const auto & shard_info : cluster->getShardsInfo())
for (const auto & shard_info : query_info.cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, res);
return Pipe::unitePipes(std::move(res));
......
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/Cluster.h>
namespace DB
{
......@@ -26,8 +25,7 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE).
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info);
IStreamFactory & stream_factory, Poco::Logger * log, const ASTPtr & query_ast, const Context & context, const SelectQueryInfo & query_info);
}
......
......@@ -489,8 +489,10 @@ BlockIO InterpreterSelectQuery::execute()
Block InterpreterSelectQuery::getSampleBlockImpl()
{
query_info.query = query_ptr;
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_ptr);
from_stage = storage->getQueryProcessingStage(*context, options.to_stage, query_info);
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState
......@@ -1415,7 +1417,6 @@ void InterpreterSelectQuery::executeFetchColumns(
if (max_streams > 1 && !is_remote)
max_streams *= settings.max_streams_to_max_threads_ratio;
query_info.query = query_ptr;
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
......
......@@ -7,7 +7,6 @@
#include <Interpreters/CancellationCode.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
#include <Storages/CheckResults.h>
#include <Storages/StorageInMemoryMetadata.h>
......@@ -55,6 +54,7 @@ using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
struct StreamLocalLimits;
class EnabledQuota;
struct SelectQueryInfo;
struct ColumnSize
{
......@@ -212,15 +212,12 @@ public:
*
* SelectQueryInfo is required since the stage can depends on the query
* (see Distributed() engine and optimize_skip_unused_shards).
* And to store optimized cluster (after optimize_skip_unused_shards).
*
* QueryProcessingStage::Enum required for Distributed over Distributed,
* since it cannot return Complete for intermediate queries never.
*/
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const
{
return getQueryProcessingStage(context, QueryProcessingStage::Complete, {});
}
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const
{
return QueryProcessingStage::FetchColumns;
}
......
......@@ -33,7 +33,7 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override { return to_stage; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const override { return to_stage; }
Pipe read(
const Names & /*column_names*/,
......
......@@ -63,6 +63,8 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** Query along with some additional data,
* that can be used during query processing
......@@ -73,6 +75,10 @@ struct SelectQueryInfo
ASTPtr query;
ASTPtr view_query; /// Optimized VIEW query
/// For optimize_skip_unused_shards
/// We can modify it in getQueryProcessingStage()
mutable ClusterPtr cluster;
TreeRewriterResultPtr syntax_analyzer_result;
PrewhereInfoPtr prewhere_info;
......
......@@ -130,7 +130,7 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const SelectQueryInfo & query_info) const
{
if (destination_id)
{
......@@ -139,7 +139,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(context, to_stage, query_ptr);
return destination->getQueryProcessingStage(context, to_stage, query_info);
}
return QueryProcessingStage::FetchColumns;
......
......@@ -54,7 +54,7 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
......
......@@ -446,11 +446,31 @@ StoragePtr StorageDistributed::createWithOwnCluster(
return res;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const SelectQueryInfo & query_info) const
{
const auto & settings = context.getSettingsRef();
auto metadata_snapshot = getInMemoryMetadataPtr();
ClusterPtr cluster = getCluster();
query_info.cluster = cluster;
/// Always calculate optimized cluster here, to avoid conditions during read()
/// (Anyway it will be calculated in the read())
if (settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}", makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.cluster = cluster;
}
else
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}", has_sharding_key ? "" : " (no sharding key)");
}
}
if (settings.distributed_group_by_no_merge)
{
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
......@@ -464,14 +484,6 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
if (to_stage == QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::WithMergeableState;
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(context, metadata_snapshot, query_ptr);
if (optimized_cluster)
cluster = optimized_cluster;
}
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
......@@ -483,7 +495,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
sharding_key_is_deterministic)
{
Block sharding_key_block = sharding_key_expr->getSampleBlock();
auto stage = getOptimizedQueryProcessingStage(query_ptr, settings.extremes, sharding_key_block);
auto stage = getOptimizedQueryProcessingStage(query_info.query, settings.extremes, sharding_key_block);
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
......@@ -503,23 +515,6 @@ Pipe StorageDistributed::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
const auto & settings = context.getSettingsRef();
ClusterPtr cluster = getCluster();
if (settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}", makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
}
else
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}", has_sharding_key ? "" : " (no sharding key)");
}
}
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
......@@ -538,8 +533,7 @@ Pipe StorageDistributed::read(
: ClusterProxy::SelectStreamFactory(
header, processed_stage, StorageID{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
return ClusterProxy::executeQuery(select_stream_factory, cluster, log,
modified_query_ast, context, context.getSettingsRef(), query_info);
return ClusterProxy::executeQuery(select_stream_factory, log, modified_query_ast, context, query_info);
}
......
......@@ -66,7 +66,7 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
......
......@@ -101,9 +101,9 @@ StorageMaterializedView::StorageMaterializedView(
DatabaseCatalog::instance().addDependency(select.select_table_id, getStorageID());
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const SelectQueryInfo & query_info) const
{
return getTargetTable()->getQueryProcessingStage(context, to_stage, query_ptr);
return getTargetTable()->getQueryProcessingStage(context, to_stage, query_info);
}
Pipe StorageMaterializedView::read(
......
......@@ -64,7 +64,7 @@ public:
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
......
......@@ -149,7 +149,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const SelectQueryInfo & query_info) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
......@@ -163,7 +163,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table && table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_ptr));
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, to_stage, query_info));
}
iterator->next();
......@@ -308,7 +308,7 @@ Pipe StorageMerge::createSources(
return pipe;
}
auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, modified_query_info.query);
auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, modified_query_info);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.
......
......@@ -27,7 +27,7 @@ public:
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
......
......@@ -10,6 +10,7 @@
#include <IO/WriteBufferFromOStream.h>
#include <Interpreters/Context.h>
#include <Storages/StorageLog.h>
#include <Storages/SelectQueryInfo.h>
#include <Common/typeid_cast.h>
#include <Common/tests/gtest_global_context.h>
......@@ -114,7 +115,9 @@ std::string readData(DB::StoragePtr & table, const DB::Context & context)
Names column_names;
column_names.push_back("a");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
SelectQueryInfo query_info;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
QueryPipeline pipeline;
pipeline.init(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册