diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index bebb6ee79c3ca4d865d9ec9159f7a7c6277ab73f..72044ab832cb52f2d8f564bf4eeb05ebcc9b9e36 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -16,6 +16,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int UNEXPECTED_NODE_IN_ZOOKEEPER; extern const int UNFINISHED; + extern const int ABORTED; } @@ -426,6 +427,8 @@ bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) { std::lock_guard lock(pull_logs_to_queue_mutex); + if (pull_log_blocker.isCancelled()) + throw Exception("Log pulling is cancelled", ErrorCodes::ABORTED); String index_str = zookeeper->get(replica_path + "/log_pointer"); UInt64 index; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index fcb3dfb4b86b9e509c69aa5093f1b721c3ecca70..2191104a29181a31da48f64d07c10c1ae45e5343 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -356,6 +356,9 @@ public: /// A blocker that stops selects from the queue ActionBlocker actions_blocker; + /// A blocker that stops pulling entries from replication log to queue + ActionBlocker pull_log_blocker; + /// Adds a subscriber SubscriberHandler addSubscriber(SubscriberCallBack && callback); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a19a424c643044a65a3c8b38f4e89db34c8d1a3f..3f907541a3cb855cdb9376500ea6bff4f6aa8e01 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -300,8 +300,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } createNewZooKeeperNodes(); - - } @@ -2905,6 +2903,7 @@ void StorageReplicatedMergeTree::shutdown() fetcher.blocker.cancelForever(); merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); + queue.pull_log_blocker.cancelForever(); restarting_thread.shutdown(); @@ -3641,7 +3640,11 @@ void StorageReplicatedMergeTree::drop(TableStructureWriteLockHolder &) LOG_INFO(log, "Removing replica " << replica_path); replica_is_active_node = nullptr; + /// It may left some garbage if replica_path subtree are concurently modified zookeeper->tryRemoveRecursive(replica_path); + if (zookeeper->exists(replica_path)) + LOG_ERROR(log, "Replica was not completely removed from ZooKeeper, " + << replica_path << " still exists and may contain some garbage."); /// Check that `zookeeper_path` exists: it could have been deleted by another replica after execution of previous line. Strings replicas; @@ -3649,6 +3652,9 @@ void StorageReplicatedMergeTree::drop(TableStructureWriteLockHolder &) { LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)"); zookeeper->tryRemoveRecursive(zookeeper_path); + if (zookeeper->exists(zookeeper_path)) + LOG_ERROR(log, "Table was not completely removed from ZooKeeper, " + << zookeeper_path << " still exists and may contain some garbage."); } } diff --git a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh index 2d458ba7e280d34d1d207d7cf785db43be708506..01127a3ef7bd26bb1127ae9f3a2928d33fd5d59a 100755 --- a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh +++ b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh @@ -72,28 +72,28 @@ timeout $TIMEOUT bash -c thread2 2> /dev/null & timeout $TIMEOUT bash -c thread3 2> /dev/null & timeout $TIMEOUT bash -c thread4 2> /dev/null & timeout $TIMEOUT bash -c thread5 2> /dev/null & -timeout $TIMEOUT bash -c thread6 2> /dev/null & +timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from ZooKeeper" & timeout $TIMEOUT bash -c thread1 2> /dev/null & timeout $TIMEOUT bash -c thread2 2> /dev/null & timeout $TIMEOUT bash -c thread3 2> /dev/null & timeout $TIMEOUT bash -c thread4 2> /dev/null & timeout $TIMEOUT bash -c thread5 2> /dev/null & -timeout $TIMEOUT bash -c thread6 2> /dev/null & +timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from ZooKeeper" & timeout $TIMEOUT bash -c thread1 2> /dev/null & timeout $TIMEOUT bash -c thread2 2> /dev/null & timeout $TIMEOUT bash -c thread3 2> /dev/null & timeout $TIMEOUT bash -c thread4 2> /dev/null & timeout $TIMEOUT bash -c thread5 2> /dev/null & -timeout $TIMEOUT bash -c thread6 2> /dev/null & +timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from ZooKeeper" & timeout $TIMEOUT bash -c thread1 2> /dev/null & timeout $TIMEOUT bash -c thread2 2> /dev/null & timeout $TIMEOUT bash -c thread3 2> /dev/null & timeout $TIMEOUT bash -c thread4 2> /dev/null & timeout $TIMEOUT bash -c thread5 2> /dev/null & -timeout $TIMEOUT bash -c thread6 2> /dev/null & +timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from ZooKeeper" & wait