提交 5211a42c 编写于 作者: A Alexey Milovidov

Remove leader election, step 3

上级 1a3a1939
......@@ -31,7 +31,6 @@
M(SendExternalTables, "Number of connections that are sending data for external tables to remote servers. External tables are used to implement GLOBAL IN and GLOBAL JOIN operators with distributed subqueries.") \
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(LeaderReplica, "Number of Replicated tables that are leaders. Leader replica is responsible for assigning merges, cleaning old blocks for deduplications and a few more bookkeeping tasks. There may be no more than one leader across all replicas at one moment of time. If there is no leader it will be elected soon or it indicate an issue.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MemoryTrackingInBackgroundProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround merges, mutations and fetches). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundMoveProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround moves). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
......@@ -39,7 +38,6 @@
M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \
M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \
M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(LeaderElection, "Number of Replicas participating in leader election. Equals to total number of replicas in usual cases.") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \
......
......@@ -277,7 +277,6 @@ namespace ErrorCodes
extern const int AIO_READ_ERROR = 274;
extern const int AIO_WRITE_ERROR = 275;
extern const int INDEX_NOT_USED = 277;
extern const int LEADERSHIP_LOST = 278;
extern const int ALL_CONNECTION_TRIES_FAILED = 279;
extern const int NO_AVAILABLE_DATA = 280;
extern const int DICTIONARY_IS_EMPTY = 281;
......@@ -291,7 +290,6 @@ namespace ErrorCodes
extern const int REPLICA_IS_NOT_IN_QUORUM = 289;
extern const int LIMIT_EXCEEDED = 290;
extern const int DATABASE_ACCESS_DENIED = 291;
extern const int LEADERSHIP_CHANGED = 292;
extern const int MONGODB_CANNOT_AUTHENTICATE = 293;
extern const int INVALID_BLOCK_EXTRA_INFO = 294;
extern const int RECEIVED_EMPTY_DATA = 295;
......@@ -496,6 +494,7 @@ namespace ErrorCodes
extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526;
extern const int CASSANDRA_INTERNAL_ERROR = 527;
extern const int NOT_A_LEADER = 528;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -112,7 +112,6 @@
M(SlowRead, "Number of reads from a file that were slow. This indicate system overload. Thresholds are controlled by read_backoff_* settings.") \
M(ReadBackoff, "Number of times the number of query processing threads was lowered due to slow reads.") \
\
M(ReplicaYieldLeadership, "Number of times Replicated table was yielded its leadership due to large replication lag relative to other replicas.") \
M(ReplicaPartialShutdown, "How many times Replicated table has to deinitialize its state due to session expiration in ZooKeeper. The state is reinitialized every time when ZooKeeper is available again.") \
\
M(SelectedParts, "Number of data parts selected to read from a MergeTree table.") \
......@@ -131,7 +130,6 @@
M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \
\
M(CannotRemoveEphemeralNode, "Number of times an error happened while trying to remove ephemeral node. This is not an issue, because our implementation of ZooKeeper library guarantee that the session will expire and the node will be removed.") \
M(LeaderElectionAcquiredLeadership, "Number of times a ReplicatedMergeTree table became a leader. Leader replica is responsible for assigning merges, cleaning old blocks for deduplications and a few more bookkeeping tasks.") \
\
M(RegexpCreated, "Compiled regular expressions. Identical regular expressions compiled just once and cached forever.") \
M(ContextLock, "Number of times the lock of Context was acquired or tried to acquire. This is global lock.") \
......
......@@ -821,7 +821,6 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
executed_by_leader = true;
break;
}
}
/// Does nothing if wasn't previously locked
......
......@@ -9,17 +9,6 @@
#include <Core/BackgroundSchedulePool.h>
namespace ProfileEvents
{
extern const Event LeaderElectionAcquiredLeadership;
}
namespace CurrentMetrics
{
extern const Metric LeaderElection;
}
namespace zkutil
{
......@@ -91,8 +80,6 @@ private:
std::atomic<bool> shutdown_called {false};
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
void createNode()
{
shutdown_called = false;
......@@ -128,7 +115,6 @@ private:
#if !defined(ARCADIA_BUILD) /// C++20; Replicated tables are unused in Arcadia.
if (value.ends_with(suffix))
{
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
handler();
return;
}
......
......@@ -71,10 +71,9 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingSeconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
\
/** Check delay of replicas settings. */ \
M(SettingUInt64, check_delay_period, 60, "Period to check replication delay and compare with other replicas.", 0) \
M(SettingUInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \
M(SettingUInt64, cleanup_delay_period, 30, "Period to clean old queue logs, blocks hashes and parts.", 0) \
M(SettingUInt64, cleanup_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to cleanup_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables.", 0) \
M(SettingUInt64, min_relative_delay_to_yield_leadership, 120, "Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited.", 0) \
M(SettingUInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(SettingUInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(SettingUInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \
......@@ -95,7 +94,11 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingMaxThreads, max_part_loading_threads, 0, "The number of threads to load data parts at startup.", 0) \
M(SettingMaxThreads, max_part_removal_threads, 0, "The number of threads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).", 0) \
M(SettingUInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \
M(SettingString, storage_policy, "default", "Name of storage disk policy", 0)
M(SettingString, storage_policy, "default", "Name of storage disk policy", 0) \
\
/** Obsolete settings. Kept for backward compatibility only. */ \
M(SettingUInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
M(SettingUInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)
......
......@@ -10,7 +10,6 @@
namespace ProfileEvents
{
extern const Event ReplicaYieldLeadership;
extern const Event ReplicaPartialShutdown;
}
......@@ -48,10 +47,6 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
const auto storage_settings = storage.getSettings();
check_period_ms = storage_settings->zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage_settings->check_delay_period) * 1000)
check_period_ms = storage_settings->check_delay_period * 1000;
task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
}
......@@ -121,37 +116,6 @@ void ReplicatedMergeTreeRestartingThread::run()
first_time = false;
}
time_t current_time = time(nullptr);
const auto storage_settings = storage.getSettings();
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage_settings->check_delay_period))
{
/// Find out lag of replicas.
time_t absolute_delay = 0;
time_t relative_delay = 0;
storage.getReplicaDelays(absolute_delay, relative_delay);
if (absolute_delay)
LOG_TRACE(log, "Absolute delay: {}. Relative delay: {}.", absolute_delay, relative_delay);
prev_time_of_check_delay = current_time;
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage_settings->min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay ({} seconds) is bigger than threshold ({}). Will yield leadership.", relative_delay, storage_settings->min_relative_delay_to_yield_leadership);
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
storage.exitLeaderElection();
/// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
/// This is bad because we can end up without a leader on any replica.
/// In this case we rely on the fact that the session will expire and we will reconnect.
storage.enterLeaderElection();
}
}
}
catch (...)
{
......
......@@ -42,7 +42,6 @@ private:
BackgroundSchedulePool::TaskHolder task;
Int64 check_period_ms; /// The frequency of checking expiration of session in ZK.
bool first_time = true; /// Activate replica for the first time.
time_t prev_time_of_check_delay = 0;
bool startup_completed = false;
void run();
......
......@@ -77,11 +77,6 @@ namespace ProfileEvents
extern const Event NotCreatedLogEntryForMutation;
}
namespace CurrentMetrics
{
extern const Metric LeaderReplica;
}
namespace DB
{
......@@ -102,7 +97,7 @@ namespace ErrorCodes
extern const int TABLE_IS_READ_ONLY;
extern const int NOT_FOUND_NODE;
extern const int NO_ACTIVE_REPLICAS;
extern const int LEADERSHIP_CHANGED;
extern const int NOT_A_LEADER;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int PARTITION_ALREADY_EXISTS;
extern const int TOO_MANY_RETRIES_TO_FETCH_PARTS;
......@@ -2736,7 +2731,6 @@ void StorageReplicatedMergeTree::enterLeaderElection()
{
auto callback = [this]()
{
CurrentMetrics::add(CurrentMetrics::LeaderReplica);
LOG_INFO(log, "Became leader");
is_leader = true;
......@@ -2771,7 +2765,6 @@ void StorageReplicatedMergeTree::exitLeaderElection()
if (is_leader)
{
CurrentMetrics::sub(CurrentMetrics::LeaderReplica);
LOG_INFO(log, "Stopped being leader");
is_leader = false;
......@@ -3464,15 +3457,12 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool StorageReplicatedMergeTree::optimize(
const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context)
const ASTPtr &, const ASTPtr & partition, bool final, bool deduplicate, const Context & query_context)
{
assertNotReadonly();
if (!is_leader)
{
sendRequestToLeaderReplica(query, query_context);
return true;
}
throw Exception("OPTIMIZE cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
constexpr size_t max_retries = 10;
......@@ -3957,20 +3947,14 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context)
void StorageReplicatedMergeTree::dropPartition(const ASTPtr &, const ASTPtr & partition, bool detach, const Context & query_context)
{
assertNotReadonly();
if (!is_leader)
throw Exception("DROP PARTITION cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (!is_leader)
{
// TODO: we can manually reconstruct the query from outside the |dropPartition()| and remove the |query| argument from interface.
// It's the only place where we need this argument.
sendRequestToLeaderReplica(query, query_context);
return;
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LogEntry entry;
......@@ -3991,20 +3975,16 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
}
void StorageReplicatedMergeTree::truncate(const ASTPtr & query, const Context & query_context, TableStructureWriteLockHolder & table_lock)
void StorageReplicatedMergeTree::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder & table_lock)
{
table_lock.release(); /// Truncate is done asynchronously.
assertNotReadonly();
if (!is_leader)
throw Exception("TRUNCATE cannot be done on this replica because it is not a leader", ErrorCodes::NOT_A_LEADER);
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (!is_leader)
{
sendRequestToLeaderReplica(query, query_context);
return;
}
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
for (String & partition_id : partitions)
......@@ -4389,74 +4369,6 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
}
/// TODO: Probably it is better to have queue in ZK with tasks for leader (like DDL)
void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context)
{
auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election");
if (live_replicas.empty())
throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS);
std::sort(live_replicas.begin(), live_replicas.end());
const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front());
if (leader == replica_name)
throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
/// SECONDARY_QUERY here means, that we received query from DDLWorker
/// there is no sense to send query to leader, because he will receive it from own DDLWorker
if (query_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
throw Exception("Cannot execute DDL query, because leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);
}
ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host"));
/// TODO: add setters and getters interface for database and table fields of AST
auto new_query = query->clone();
if (auto * alter = new_query->as<ASTAlterQuery>())
{
alter->database = leader_address.database;
alter->table = leader_address.table;
}
else if (auto * optimize = new_query->as<ASTOptimizeQuery>())
{
optimize->database = leader_address.database;
optimize->table = leader_address.table;
}
else if (auto * drop = new_query->as<ASTDropQuery>(); drop->kind == ASTDropQuery::Kind::Truncate)
{
drop->database = leader_address.database;
drop->table = leader_address.table;
}
else
throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED);
const auto & query_settings = query_context.getSettingsRef();
const auto & query_client_info = query_context.getClientInfo();
String user = query_client_info.current_user;
String password = query_client_info.current_password;
if (auto address = findClusterAddress(leader_address); address)
{
user = address->user;
password = address->password;
}
Connection connection(
leader_address.host,
leader_address.queries_port,
leader_address.database,
user, password, "Follower replica");
std::stringstream new_query_ss;
formatAST(*new_query, new_query_ss, false, true);
RemoteBlockInputStream stream(connection, new_query_ss.str(), {}, global_context, &query_settings);
NullBlockOutputStream output({});
copyData(stream, output);
}
std::optional<Cluster::Address> StorageReplicatedMergeTree::findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const
{
for (auto & iter : global_context.getClusters().getContainer())
......@@ -4535,7 +4447,7 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
* Calculated only if the absolute delay is large enough.
*/
if (out_absolute_delay < static_cast<time_t>(storage_settings_ptr->min_relative_delay_to_yield_leadership))
if (out_absolute_delay < static_cast<time_t>(storage_settings_ptr->min_relative_delay_to_measure))
return;
auto zookeeper = getZooKeeper();
......
......@@ -162,7 +162,7 @@ public:
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
/// If the absolute delay is greater than min_relative_delay_to_yield_leadership,
/// If the absolute delay is greater than min_relative_delay_to_measure,
/// will also calculate the difference from the unprocessed time of the best replica.
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
......@@ -497,10 +497,6 @@ private:
*/
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/// Choose leader replica, send requst to it and wait.
/// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context);
/// Throw an exception if the table is readonly.
void assertNotReadonly() const;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册