提交 514a4d4f 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

use stale local replica if all remote replicas are unavailable [#CLICKHOUSE-3377]

上级 fbef2a98
......@@ -5,17 +5,23 @@
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <common/logger_useful.h>
namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable;
extern const Event DistributedConnectionStaleReplica;
}
namespace DB
{
namespace ErrorCodes
{
extern const int ALL_REPLICAS_ARE_STALE;
extern const int ALL_REPLICAS_ARE_STALE;
}
namespace ClusterProxy
......@@ -72,6 +78,7 @@ void SelectStreamFactory::createForShard(
StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
if (!main_table_storage) /// Table is absent on a local server.
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
if (shard_info.pool)
{
LOG_WARNING(
......@@ -117,6 +124,10 @@ void SelectStreamFactory::createForShard(
}
/// If we reached this point, local replica is stale.
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");
if (!settings.fallback_to_stale_replicas_for_distributed_queries)
{
......@@ -128,9 +139,9 @@ void SelectStreamFactory::createForShard(
}
else
throw Exception(
"Local replica for shard " + toString(shard_info.shard_num)
+ " is stale (delay: " + toString(local_delay) + "), but no other replica configured.",
ErrorCodes::ALL_REPLICAS_ARE_STALE);
"Local replica of shard " + toString(shard_info.shard_num)
+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",
ErrorCodes::ALL_REPLICAS_ARE_STALE);
}
if (!shard_info.pool)
......@@ -144,13 +155,25 @@ void SelectStreamFactory::createForShard(
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, query, query_ast, context, throttler,
pool = shard_info.pool, shard_num = shard_info.shard_num, query, query_ast, context, throttler,
main_table = main_table, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
std::vector<ConnectionPoolWithFailover::TryResult> try_results =
pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");
else
throw;
}
double max_remote_delay = 0.0;
for (const auto & try_result : try_results)
......@@ -159,7 +182,7 @@ void SelectStreamFactory::createForShard(
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
}
if (local_delay < max_remote_delay)
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(query_ast, context, stage);
else
{
......
......@@ -58,7 +58,7 @@ def test(started_cluster):
assert node_2_1.query("SELECT sum(x) FROM replicated").strip() == ''
assert node_2_2.query("SELECT sum(x) FROM replicated").strip() == '2'
# With in_order balancing first replicas chosen.
# With in_order balancing first replicas are chosen.
assert instance_with_dist_table.query(
"SELECT count() FROM distributed SETTINGS load_balancing='in_order'").strip() == ''
......@@ -89,3 +89,12 @@ SELECT count() FROM distributed SETTINGS
max_replica_delay_for_distributed_queries=1,
fallback_to_stale_replicas_for_distributed_queries=0
''')
# Now partition off the remote replica of the local shard and test that failover still works.
pm.partition_instances(node_1_1, node_1_2, port=9000)
assert instance_with_dist_table.query('''
SELECT sum(x) FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '2'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册