ConnectionPoolWithFailover.cpp 7.3 KB
Newer Older
1 2 3 4 5 6
#include <Client/ConnectionPoolWithFailover.h>

#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>

#include <Common/getFQDNOrHostName.h>
7
#include <Common/isLocalAddress.h>
8 9 10 11 12 13
#include <Common/ProfileEvents.h>
#include <Interpreters/Settings.h>


namespace ProfileEvents
{
14
    extern const Event DistributedConnectionMissingTable;
15 16 17 18 19 20
    extern const Event DistributedConnectionStaleReplica;
}

namespace DB
{

21 22 23 24 25 26 27 28
namespace ErrorCodes
{
    extern const int NETWORK_ERROR;
    extern const int SOCKET_TIMEOUT;
    extern const int LOGICAL_ERROR;
}


29
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
30
        ConnectionPoolPtrs nested_pools_,
31 32 33
        LoadBalancing load_balancing,
        size_t max_tries_,
        time_t decrease_error_period_)
34
    : Base(std::move(nested_pools_), max_tries_, decrease_error_period_, &Logger::get("ConnectionPoolWithFailover"))
35 36 37 38 39 40 41 42
    , default_load_balancing(load_balancing)
{
    const std::string & local_hostname = getFQDNOrHostName();

    hostname_differences.resize(nested_pools.size());
    for (size_t i = 0; i < nested_pools.size(); ++i)
    {
        ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
43
        hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
44 45 46
    }
}

47
IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool force_connected)
48
{
A
Alexey Zatelepin 已提交
49
    TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
50 51 52 53
    {
        return tryGetEntry(pool, fail_message, settings);
    };

A
Alexey Zatelepin 已提交
54
    GetPriorityFunc get_priority;
55 56 57 58 59 60 61 62 63 64 65 66 67
    switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing)
    {
    case LoadBalancing::NEAREST_HOSTNAME:
        get_priority = [&](size_t i) { return hostname_differences[i]; };
        break;
    case LoadBalancing::IN_ORDER:
        get_priority = [](size_t i) { return i; };
        break;
    case LoadBalancing::RANDOM:
        break;
    }

    return Base::get(try_get_entry, get_priority);
68 69
}

70
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Settings * settings, PoolMode pool_mode)
71
{
A
Alexey Zatelepin 已提交
72
    TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
73 74 75
    {
        return tryGetEntry(pool, fail_message, settings);
    };
76 77 78 79 80 81 82 83

    std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);

    std::vector<Entry> entries;
    entries.reserve(results.size());
    for (auto & result : results)
        entries.emplace_back(std::move(result.entry));
    return entries;
84 85
}

86
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
87 88
        const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
A
Alexey Zatelepin 已提交
89
    TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
90 91 92 93 94 95
    {
        return tryGetEntry(pool, fail_message, settings, &table_to_check);
    };
    return getManyImpl(settings, pool_mode, try_get_entry);
}

96
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
97 98
        const Settings * settings,
        PoolMode pool_mode,
A
Alexey Zatelepin 已提交
99
        const TryGetEntryFunc & try_get_entry)
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
{
    size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
    size_t max_entries;
    if (pool_mode == PoolMode::GET_ALL)
    {
        min_entries = nested_pools.size();
        max_entries = nested_pools.size();
    }
    else if (pool_mode == PoolMode::GET_ONE)
        max_entries = 1;
    else if (pool_mode == PoolMode::GET_MANY)
        max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;
    else
        throw DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);

A
Alexey Zatelepin 已提交
115
    GetPriorityFunc get_priority;
116
    switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing)
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    {
    case LoadBalancing::NEAREST_HOSTNAME:
        get_priority = [&](size_t i) { return hostname_differences[i]; };
        break;
    case LoadBalancing::IN_ORDER:
        get_priority = [](size_t i) { return i; };
        break;
    case LoadBalancing::RANDOM:
        break;
    }

    bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;

    return Base::getMany(min_entries, max_entries, try_get_entry, get_priority, fallback_to_stale_replicas);
}

A
Alexey Zatelepin 已提交
133
ConnectionPoolWithFailover::TryResult
134 135 136 137 138 139 140 141 142
ConnectionPoolWithFailover::tryGetEntry(
        IConnectionPool & pool,
        std::string & fail_message,
        const Settings * settings,
        const QualifiedTableName * table_to_check)
{
    TryResult result;
    try
    {
143
        result.entry = pool.get(settings, /* force_connected = */ false);
144 145 146 147 148

        String server_name;
        UInt64 server_version_major;
        UInt64 server_version_minor;
        UInt64 server_revision;
149
        if (table_to_check)
150 151
            result.entry->getServerVersion(server_name, server_version_major, server_version_minor, server_revision);

152
        if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
153 154
        {
            result.entry->forceConnected();
155
            result.is_usable = true;
156
            result.is_up_to_date = true;
157 158 159
            return result;
        }

160 161
        /// Only status of the remote table corresponding to the Distributed table is taken into account.
        /// TODO: request status for joined tables also.
162
        TablesStatusRequest status_request;
163
        status_request.tables.emplace(*table_to_check);
164

165 166 167
        TablesStatusResponse status_response = result.entry->getTablesStatus(status_request);
        auto table_status_it = status_response.table_states_by_id.find(*table_to_check);
        if (table_status_it == status_response.table_states_by_id.end())
168
        {
169 170 171 172 173 174 175
            fail_message = "There is no table " + table_to_check->database + "." + table_to_check->table
                + " on server: " + result.entry->getDescription();
            LOG_WARNING(log, fail_message);
            ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);

            return result;
        }
176

177 178 179 180 181 182 183
        result.is_usable = true;

        UInt64 max_allowed_delay = settings ? UInt64(settings->max_replica_delay_for_distributed_queries) : 0;
        if (!max_allowed_delay)
        {
            result.is_up_to_date = true;
            return result;
184 185
        }

186 187 188
        UInt32 delay = table_status_it->second.absolute_delay;

        if (delay < max_allowed_delay)
189 190
            result.is_up_to_date = true;
        else
191 192
        {
            result.is_up_to_date = false;
193
            result.staleness = delay;
194

A
Alexey Zatelepin 已提交
195
            LOG_TRACE(
196
                    log, "Server " << result.entry->getDescription() << " has unacceptable replica delay "
197
                    << "for table " << table_to_check->database << "." << table_to_check->table
198
                    << ": "  << delay);
199 200 201 202 203 204 205 206 207
            ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
        }
    }
    catch (const Exception & e)
    {
        if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT
            && e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
            throw;

A
Alexey Zatelepin 已提交
208
        fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false);
209 210 211 212

        if (!result.entry.isNull())
        {
            result.entry->disconnect();
213
            result.reset();
214 215 216 217 218 219
        }
    }
    return result;
};

}