SelectStreamFactory.cpp 7.3 KB
Newer Older
1
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
2 3 4
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
5 6 7
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Exception.h>
8
#include <Common/ProfileEvents.h>
9 10

#include <common/logger_useful.h>
A
Merge  
Alexey Milovidov 已提交
11

12 13 14 15 16 17 18

namespace ProfileEvents
{
    extern const Event DistributedConnectionMissingTable;
    extern const Event DistributedConnectionStaleReplica;
}

A
Merge  
Alexey Milovidov 已提交
19 20 21
namespace DB
{

22 23
namespace ErrorCodes
{
24
    extern const int ALL_REPLICAS_ARE_STALE;
25 26
}

A
Merge  
Alexey Milovidov 已提交
27 28 29
namespace ClusterProxy
{

30
SelectStreamFactory::SelectStreamFactory(
31 32 33 34 35 36
        QueryProcessingStage::Enum processed_stage_,
        QualifiedTableName main_table_,
        const Tables & external_tables_)
    : processed_stage{processed_stage_}
    , main_table(std::move(main_table_))
    , external_tables{external_tables_}
A
Merge  
Alexey Milovidov 已提交
37 38 39
{
}

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
namespace
{

BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context & context, QueryProcessingStage::Enum processed_stage)
{
    InterpreterSelectQuery interpreter{query_ast, context, processed_stage};
    BlockInputStreamPtr stream = interpreter.execute().in;

    /** Materialization is needed, since from remote servers the constants come materialized.
      * If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
      * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
      */
    return std::make_shared<MaterializingBlockInputStream>(stream);
}

}

57 58 59 60 61
void SelectStreamFactory::createForShard(
        const Cluster::ShardInfo & shard_info,
        const String & query, const ASTPtr & query_ast,
        const Context & context, const ThrottlerPtr & throttler,
        BlockInputStreams & res)
A
Merge  
Alexey Milovidov 已提交
62
{
63
    auto emplace_local_stream = [&]()
64
    {
65 66 67 68
        res.emplace_back(createLocalStream(query_ast, context, processed_stage));
    };

    auto emplace_remote_stream = [&]()
69
    {
70
        auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler, external_tables, processed_stage);
71 72 73
        stream->setPoolMode(PoolMode::GET_MANY);
        stream->setMainTable(main_table);
        res.emplace_back(std::move(stream));
74 75 76 77 78 79 80
    };

    if (shard_info.isLocal())
    {
        StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
        if (!main_table_storage) /// Table is absent on a local server.
        {
81
            ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
            if (shard_info.pool)
            {
                LOG_WARNING(
                        &Logger::get("ClusterProxy::SelectStreamFactory"),
                        "There is no table " << main_table.database << "." << main_table.table
                        << " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");

                emplace_remote_stream();
                return;
            }
            else
            {
                /// Let it fail the usual way.
                emplace_local_stream();
                return;
            }
        }

        const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());

        if (!replicated_storage)
        {
            /// Table is not replicated, use local server.
            emplace_local_stream();
            return;
        }

        const Settings & settings = context.getSettingsRef();
        UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;

        if (!max_allowed_delay)
        {
            emplace_local_stream();
            return;
        }

        UInt32 local_delay = replicated_storage->getAbsoluteDelay();

        if (local_delay < max_allowed_delay)
        {
            emplace_local_stream();
            return;
        }

        /// If we reached this point, local replica is stale.
127 128 129 130
        ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
        LOG_WARNING(
            &Logger::get("ClusterProxy::SelectStreamFactory"),
            "Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");
131 132 133 134 135 136 137 138 139 140 141

        if (!settings.fallback_to_stale_replicas_for_distributed_queries)
        {
            if (shard_info.pool)
            {
                /// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.
                emplace_remote_stream();
                return;
            }
            else
                throw Exception(
142 143 144
                    "Local replica of shard " + toString(shard_info.shard_num)
                    + " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",
                    ErrorCodes::ALL_REPLICAS_ARE_STALE);
145 146 147 148 149 150 151 152 153 154 155 156 157
        }

        if (!shard_info.pool)
        {
            /// There are no remote replicas but we are allowed to fall back to stale local replica.
            emplace_local_stream();
            return;
        }

        /// Try our luck with remote replicas, but if they are stale too, then fallback to local replica.
        /// Do it lazily to avoid connecting in the main thread.

        auto lazily_create_stream = [
158
                pool = shard_info.pool, shard_num = shard_info.shard_num, query, query_ast, context, throttler,
159 160 161 162
                main_table = main_table, external_tables = external_tables, stage = processed_stage,
                local_delay]()
            -> BlockInputStreamPtr
        {
163 164 165 166 167 168 169 170 171 172 173 174 175 176
            std::vector<ConnectionPoolWithFailover::TryResult> try_results;
            try
            {
                try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
            }
            catch (const Exception & ex)
            {
                if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
                    LOG_WARNING(
                        &Logger::get("ClusterProxy::SelectStreamFactory"),
                        "Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");
                else
                    throw;
            }
177 178 179 180 181 182 183 184

            double max_remote_delay = 0.0;
            for (const auto & try_result : try_results)
            {
                if (!try_result.is_up_to_date)
                    max_remote_delay = std::max(try_result.staleness, max_remote_delay);
            }

185
            if (try_results.empty() || local_delay < max_remote_delay)
186 187 188 189 190 191 192 193 194 195 196 197 198 199
                return createLocalStream(query_ast, context, stage);
            else
            {
                std::vector<IConnectionPool::Entry> connections;
                connections.reserve(try_results.size());
                for (auto & try_result : try_results)
                    connections.emplace_back(std::move(try_result.entry));

                return std::make_shared<RemoteBlockInputStream>(
                        std::move(connections), query, context, nullptr, throttler, external_tables, stage);
            }
        };

        res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", lazily_create_stream));
200
    }
201 202
    else
        emplace_remote_stream();
A
Merge  
Alexey Milovidov 已提交
203 204 205 206
}

}
}