diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 032d1e90ff54e783509cc7a65f9074a3065a57df..2e94cfe99927b6c8881fb88c0de220c27b55127e 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -629,51 +629,54 @@ namespace { struct WaitForDisappearState { - int32_t code = 0; - int32_t event_type = 0; + std::atomic_int32_t code = 0; + std::atomic_int32_t event_type = 0; Poco::Event event; }; using WaitForDisappearStatePtr = std::shared_ptr; } -void ZooKeeper::waitForDisappear(const std::string & path) +bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & condition) { WaitForDisappearStatePtr state = std::make_shared(); - while (true) + auto callback = [state](const Coordination::ExistsResponse & response) { - auto callback = [state](const Coordination::ExistsResponse & response) - { - state->code = response.error; - if (state->code) - state->event.set(); - }; + state->code = response.error; + if (state->code) + state->event.set(); + }; - auto watch = [state](const Coordination::WatchResponse & response) + auto watch = [state](const Coordination::WatchResponse & response) + { + if (!state->code) { + state->code = response.error; if (!state->code) - { - state->code = response.error; - if (!state->code) - state->event_type = response.type; - state->event.set(); - } - }; + state->event_type = response.type; + state->event.set(); + } + }; + while (!condition || !condition()) + { /// NOTE: if the node doesn't exist, the watch will leak. - impl->exists(path, callback, watch); - state->event.wait(); + if (!condition) + state->event.wait(); + else if (!state->event.tryWait(1000)) + continue; if (state->code == Coordination::ZNONODE) - return; + return true; if (state->code) throw KeeperException(state->code, path); if (state->event_type == Coordination::DELETED) - return; + return true; } + return false; } ZooKeeperPtr ZooKeeper::startNewSession() const diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index db166314a07e714c41bca3783814196bfbc140f2..e8ab06c2182eab4a9237ca9d865526caba76bb49 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -185,8 +185,11 @@ public: /// Remove all children nodes (non recursive). void removeChildren(const std::string & path); + using WaitCondition = std::function; /// Wait for the node to disappear or return immediately if it doesn't exist. - void waitForDisappear(const std::string & path); + /// If condition is speficied, it is used to return early (when condition returns false) + /// The function returns true if waited and false if waiting was interrupted by condition. + bool waitForDisappear(const std::string & path, const WaitCondition & condition = {}); /// Async interface (a small subset of operations is implemented). /// diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 6c9d6c88332d2d754119edd8505a4bd31d6bf60b..1317b05d9fe01675781487daa94c4c985b4d4256 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation break; /// (numbers like 0000000000 and 0000000001) - /// We wait without timeout. - wait_event->wait(); + /// Replica can become inactive, so wait with timeout and recheck it + if (wait_event->tryWait(1000)) + break; } if (partial_shutdown_called) @@ -3841,7 +3842,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re { if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) { - waitForReplicaToProcessLogEntry(replica, entry); + if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active)) + unwaited.push_back(replica); } else { @@ -3854,7 +3856,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re } -void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) +bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) { String entry_str = entry.toString(); String log_node_name; @@ -3875,6 +3877,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & * To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1. */ + const auto & check_replica_become_inactive = [this, &replica]() + { + return !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"); + }; + constexpr auto event_wait_timeout_ms = 1000; + if (startsWith(entry.znode_name, "log-")) { /** In this case, just take the number from the node name `log-xxxxxxxxxx`. @@ -3886,7 +3894,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); /// Let's wait until entry gets into the replica queue. - while (true) + while (wait_for_non_active || !check_replica_become_inactive()) { zkutil::EventPtr event = std::make_shared(); @@ -3894,7 +3902,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (!log_pointer.empty() && parse(log_pointer) > log_index) break; - event->wait(); + if (wait_for_non_active) + event->wait(); + else + event->tryWait(event_wait_timeout_ms); } } else if (startsWith(entry.znode_name, "queue-")) @@ -3931,7 +3942,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue"); /// Let's wait until the entry gets into the replica queue. - while (true) + while (wait_for_non_active || !check_replica_become_inactive()) { zkutil::EventPtr event = std::make_shared(); @@ -3939,7 +3950,10 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (!log_pointer_new.empty() && parse(log_pointer_new) > log_index) break; - event->wait(); + if (wait_for_non_active) + event->wait(); + else + event->tryWait(event_wait_timeout_ms); } } } @@ -3974,13 +3988,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & if (queue_entry_to_wait_for.empty()) { LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes."); - return; + return true; } LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue"); - /// Third - wait until the entry disappears from the replica queue. - getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for); + /// Third - wait until the entry disappears from the replica queue or replica become inactive. + String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for; + if (wait_for_non_active) + return getZooKeeper()->waitForDisappear(path_to_wait_on); + + return getZooKeeper()->waitForDisappear(path_to_wait_on, check_replica_become_inactive); } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 70fb48e9b3582adaf0dc94d3d3c4b0ce051aa78d..f01e51bd769076869234cc2452422febbf66bbc5 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -486,7 +486,7 @@ private: /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry); + bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); diff --git a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece b/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece deleted file mode 100644 index be7168312124a6cd4fc2c7200b68b8540825259b..0000000000000000000000000000000000000000 --- a/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece +++ /dev/null @@ -1,12 +0,0 @@ -1725 -1725 -1725 -1725 -1725 -Starting alters -Finishing alters -1 -1 -1 -1 -1