From 566160c0486e4d1a7a9adf7f357cf7653a13d858 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Fri, 1 Sep 2017 18:05:23 +0300 Subject: [PATCH] Use FIFO lock in IStorage. [#CLICKHOUSE-3246] --- dbms/src/Common/RWLockFIFO.cpp | 42 +++++++++++++++---- dbms/src/Common/RWLockFIFO.h | 29 +++++++------ .../Common/tests/gtest_rw_lock_fifo.cpp.cpp | 5 ++- .../PushingToViewsBlockOutputStream.h | 2 +- .../Interpreters/InterpreterCreateQuery.cpp | 4 +- .../Interpreters/InterpreterDescribeQuery.cpp | 2 +- .../src/Interpreters/InterpreterDropQuery.cpp | 4 +- .../Interpreters/InterpreterInsertQuery.cpp | 2 +- .../Interpreters/InterpreterOptimizeQuery.cpp | 2 +- .../Interpreters/InterpreterRenameQuery.cpp | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- dbms/src/Storages/IStorage.cpp | 8 ++-- dbms/src/Storages/IStorage.h | 37 +++++++++------- .../Storages/MergeTree/DataPartsExchange.cpp | 2 +- .../ReplicatedMergeTreeAlterThread.cpp | 4 +- .../ReplicatedMergeTreePartCheckThread.cpp | 2 +- .../MergeTree/ShardedPartitionUploader.cpp | 2 +- dbms/src/Storages/StorageBuffer.cpp | 2 +- dbms/src/Storages/StorageDistributed.cpp | 2 +- dbms/src/Storages/StorageMerge.cpp | 4 +- dbms/src/Storages/StorageMergeTree.cpp | 12 +++--- .../Storages/StorageReplicatedMergeTree.cpp | 10 ++--- dbms/src/Storages/StorageTrivialBuffer.cpp | 2 +- .../Storages/System/StorageSystemColumns.cpp | 2 +- .../Storages/System/StorageSystemParts.cpp | 2 +- 25 files changed, 113 insertions(+), 74 deletions(-) diff --git a/dbms/src/Common/RWLockFIFO.cpp b/dbms/src/Common/RWLockFIFO.cpp index c46dfea346..2367ee6f76 100644 --- a/dbms/src/Common/RWLockFIFO.cpp +++ b/dbms/src/Common/RWLockFIFO.cpp @@ -1,5 +1,8 @@ -#include #include "RWLockFIFO.h" +#include +#include +#include + namespace DB { @@ -12,7 +15,6 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C std::unique_lock lock(mutex); - if (type == Type::Write || queue.empty() || queue.back().type == Type::Write) { /// Create new group of clients @@ -38,15 +40,41 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C throw; } + it_client->thread_number = Poco::ThreadNumber::get(); + it_client->enqueue_time = time(nullptr); + it_client->type = type; + LockHandler res = std::make_unique(shared_from_this(), it_group, it_client); /// We are first, we should not wait anything /// If we are not the first client in the group, a notification could be already sent if (it_group == queue.begin()) + { + it_client->start_time = it_client->enqueue_time; return res; + } /// Wait a notification - it_group->cv.wait(lock, [&it_group] () { return it_group->awakened; } ); + it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); } ); + + it_client->start_time = time(nullptr); + return res; +} + + +RWLockFIFO::Clients RWLockFIFO::getClientsInTheQueue() const +{ + std::unique_lock lock(mutex); + + Clients res; + for (const auto & group : queue) + { + for (const auto & client : group.clients) + { + res.emplace_back(client); + } + } + return res; } @@ -64,17 +92,17 @@ void RWLockFIFO::LockHandlerImpl::unlock() queue.erase(it_group); if (!queue.empty()) - { - queue.front().awakened = true; queue.front().cv.notify_all(); - } } + + parent.reset(); } RWLockFIFO::LockHandlerImpl::~LockHandlerImpl() { - unlock(); + if (parent) + unlock(); } diff --git a/dbms/src/Common/RWLockFIFO.h b/dbms/src/Common/RWLockFIFO.h index 2e3005438b..0cdcbe67ec 100644 --- a/dbms/src/Common/RWLockFIFO.h +++ b/dbms/src/Common/RWLockFIFO.h @@ -1,25 +1,20 @@ #pragma once +#include #include +#include #include #include -#include namespace DB { - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - - struct RWLockFIFO; using RWLockFIFOPtr = std::shared_ptr; /// Implements shared lock with FIFO service +/// It does not work as recursive mutex, so a deadlock will occur if you try to acquire 2 locks in the same thread class RWLockFIFO : public std::enable_shared_from_this { public: @@ -38,21 +33,31 @@ public: /// Client is that who wants to acquire the lock. struct Client { - explicit Client(const std::string & info = "Anonymous client") : info{info} {} + explicit Client(const std::string & info = {}) : info{info} {} + std::string info; + int thread_number = 0; + std::time_t enqueue_time = 0; + std::time_t start_time = 0; + Type type; }; class LockHandlerImpl; using LockHandler = std::unique_ptr; /// Waits in the queue and returns appropriate lock - LockHandler getLock(Type type, Client client); + LockHandler getLock(Type type, Client client = Client{}); LockHandler getLock(Type type, const std::string & who) { return getLock(type, Client(who)); } + using Clients = std::vector; + + /// Returns list of executing and waiting clients + Clients getClientsInTheQueue() const; + private: RWLockFIFO() = default; @@ -69,7 +74,6 @@ private: ClientsContainer clients; std::condition_variable cv; /// all clients of the group wait group condvar - bool awakened{false}; /// just only to handle spurious wake ups explicit Group(Type type) : type{type} {} }; @@ -88,6 +92,7 @@ public: LockHandlerImpl(const LockHandlerImpl & other) = delete; + /// Unlocks acquired lock void unlock(); ~LockHandlerImpl(); @@ -97,7 +102,7 @@ public: private: - std::mutex mutex; + mutable std::mutex mutex; GroupsContainer queue; }; diff --git a/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp b/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp index 827058a2d2..63a29bbe56 100644 --- a/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp +++ b/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp @@ -24,7 +24,7 @@ static void execute_1(size_t threads, int round, int cycles) for (int i = 0; i < cycles; ++i) { auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write; - auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 5)(gen)); + auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 1000)(gen)); auto lock = fifo_lock->getLock(type, "RW"); @@ -57,8 +57,9 @@ static void execute_1(size_t threads, int round, int cycles) TEST(Common, RWLockFIFO_1) { constexpr int cycles = 10000; + const std::vector pool_sizes{1, 2, 4, 8}; - for (size_t pool_size = 1; pool_size < 8; ++pool_size) + for (auto pool_size : pool_sizes) { for (int round = 0; round < 10; ++round) { diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h index 3236d44a2d..dba48d34cd 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -26,7 +26,7 @@ public: * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, * but it's clear that here is not the best place for this functionality. */ - addTableLock(storage->lockStructure(true)); + addTableLock(storage->lockStructure(true, __PRETTY_FUNCTION__)); Dependencies dependencies = context.getDependencies(database, table); for (const auto & database_table : dependencies) diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 80536a76a1..3be60614bd 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -506,7 +506,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) if (!as_table_name.empty()) { as_storage = context.getTable(as_database_name, as_table_name); - as_storage_lock = as_storage->lockStructure(false); + as_storage_lock = as_storage->lockStructure(false, __PRETTY_FUNCTION__); } /// Set and retrieve list of columns. @@ -557,7 +557,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) /// If the CREATE SELECT query is, insert the data into the table if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate)) { - auto table_lock = res->lockStructure(true); + auto table_lock = res->lockStructure(true, __PRETTY_FUNCTION__); /// Also see InterpreterInsertQuery. BlockOutputStreamPtr out = diff --git a/dbms/src/Interpreters/InterpreterDescribeQuery.cpp b/dbms/src/Interpreters/InterpreterDescribeQuery.cpp index 12983d0705..11e8ca92fe 100644 --- a/dbms/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDescribeQuery.cpp @@ -60,7 +60,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl() { StoragePtr table = context.getTable(ast.database, ast.table); - auto table_lock = table->lockStructure(false); + auto table_lock = table->lockStructure(false, __PRETTY_FUNCTION__); columns = table->getColumnsList(); columns.insert(std::end(columns), std::begin(table->alias_columns), std::end(table->alias_columns)); column_defaults = table->column_defaults; diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index ae25c53ea1..cefb092820 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -53,7 +53,7 @@ BlockIO InterpreterDropQuery::execute() { table->shutdown(); /// If table was already dropped by anyone, an exception will be thrown - auto table_lock = table->lockForAlter(); + auto table_lock = table->lockForAlter(__PRETTY_FUNCTION__); /// Delete table data table->drop(); table->is_dropped = true; @@ -116,7 +116,7 @@ BlockIO InterpreterDropQuery::execute() table.first->shutdown(); /// If table was already dropped by anyone, an exception will be thrown - auto table_lock = table.first->lockForAlter(); + auto table_lock = table.first->lockForAlter(__PRETTY_FUNCTION__); String current_table_name = table.first->getTableName(); diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 72d093c940..e3ceec26f7 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -86,7 +86,7 @@ BlockIO InterpreterInsertQuery::execute() ASTInsertQuery & query = typeid_cast(*query_ptr); StoragePtr table = getTable(); - auto table_lock = table->lockStructure(true); + auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__); NamesAndTypesListPtr required_columns = std::make_shared(table->getColumnsList()); diff --git a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp index 42c6901acc..c9bce30271 100644 --- a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp +++ b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp @@ -22,7 +22,7 @@ BlockIO InterpreterOptimizeQuery::execute() throw Exception("FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS); StoragePtr table = context.getTable(ast.database, ast.table); - auto table_lock = table->lockStructure(true); + auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__); table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context.getSettings()); return {}; } diff --git a/dbms/src/Interpreters/InterpreterRenameQuery.cpp b/dbms/src/Interpreters/InterpreterRenameQuery.cpp index 6e33d06864..d626e80a06 100644 --- a/dbms/src/Interpreters/InterpreterRenameQuery.cpp +++ b/dbms/src/Interpreters/InterpreterRenameQuery.cpp @@ -100,7 +100,7 @@ BlockIO InterpreterRenameQuery::execute() for (const auto & names : unique_tables_from) if (auto table = context.tryGetTable(names.database_name, names.table_name)) - locks.emplace_back(table->lockForAlter()); + locks.emplace_back(table->lockForAlter(__PRETTY_FUNCTION__)); /** All tables are locked. If there are more than one rename in chain, * we need to hold global lock while doing all renames. Order matters to avoid deadlocks. diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index f5f2659afb..c5518e7de3 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -156,7 +156,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_) storage = context.getTable(database_name, table_name); } - table_lock = storage->lockStructure(false); + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); if (table_column_names.empty()) table_column_names = storage->getColumnsListNonMaterialized(); } diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index 628d0e34d9..70d016566d 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -4,13 +4,13 @@ namespace DB { -TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data) - : storage(storage_), data_lock(storage->data_lock, std::defer_lock), structure_lock(storage->structure_lock, std::defer_lock) +TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who) + : storage(storage_) { if (lock_data) - data_lock.lock(); + data_lock = storage->data_lock->getLock(RWLockFIFO::Read, who); if (lock_structure) - structure_lock.lock(); + structure_lock = storage->structure_lock->getLock(RWLockFIFO::Read, who); } } diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index b0a87f74ca..685480177a 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -22,6 +23,9 @@ class Context; class IBlockInputStream; class IBlockOutputStream; +struct RWLockFIFO; +using RWLockFIFOPtr = std::shared_ptr; + using BlockOutputStreamPtr = std::shared_ptr; using BlockInputStreamPtr = std::shared_ptr; using BlockInputStreams = std::vector; @@ -54,18 +58,19 @@ private: StoragePtr storage; /// Order is important. - std::shared_lock data_lock; - std::shared_lock structure_lock; + RWLockFIFO::LockHandler data_lock; + RWLockFIFO::LockHandler structure_lock; public: - TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data); + TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who); }; + using TableStructureReadLockPtr = std::shared_ptr; using TableStructureReadLocks = std::vector; -using TableStructureWriteLock = std::unique_lock; -using TableDataWriteLock = std::unique_lock; +using TableStructureWriteLock = RWLockFIFO::LockHandler; +using TableDataWriteLock = RWLockFIFO::LockHandler; using TableFullWriteLock = std::pair; @@ -107,9 +112,9 @@ public: * WARNING: You need to call methods from ITableDeclaration under such a lock. Without it, they are not thread safe. * WARNING: To avoid deadlocks, this method must not be called under lock of Context. */ - TableStructureReadLockPtr lockStructure(bool will_modify_data) + TableStructureReadLockPtr lockStructure(bool will_modify_data, const std::string & who = "Anonymous") { - TableStructureReadLockPtr res = std::make_shared(shared_from_this(), true, will_modify_data); + TableStructureReadLockPtr res = std::make_shared(shared_from_this(), true, will_modify_data, who); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; @@ -117,11 +122,11 @@ public: /** Does not allow reading the table structure. It is taken for ALTER, RENAME and DROP. */ - TableFullWriteLock lockForAlter() + TableFullWriteLock lockForAlter(const std::string & who = "Alter") { /// The calculation order is important. - auto data_lock = lockDataForAlter(); - auto structure_lock = lockStructureForAlter(); + auto data_lock = lockDataForAlter(who); + auto structure_lock = lockStructureForAlter(who); return {std::move(data_lock), std::move(structure_lock)}; } @@ -130,17 +135,17 @@ public: * It is taken during write temporary data in ALTER MODIFY. * Under this lock, you can take lockStructureForAlter() to change the structure of the table. */ - TableDataWriteLock lockDataForAlter() + TableDataWriteLock lockDataForAlter(const std::string & who = "Alter") { - std::unique_lock res(data_lock); + auto res = data_lock->getLock(RWLockFIFO::Write, who); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; } - TableStructureWriteLock lockStructureForAlter() + TableStructureWriteLock lockStructureForAlter(const std::string & who = "Alter") { - std::unique_lock res(structure_lock); + auto res = structure_lock->getLock(RWLockFIFO::Write, who); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; @@ -316,7 +321,7 @@ private: * 2) all changes to the data after releasing the lock will be based on the structure of the table at the time after the lock was released. * You need to take for read for the entire time of the operation that changes the data. */ - mutable std::shared_mutex data_lock; + mutable RWLockFIFOPtr data_lock = RWLockFIFO::create(); /** Lock for multiple columns and path to table. It is taken for write at RENAME, ALTER (for ALTER MODIFY for a while) and DROP. * It is taken for read for the whole time of SELECT, INSERT and merge parts (for MergeTree). @@ -325,7 +330,7 @@ private: * That is, if this lock is taken for write, you should not worry about `parts_writing_lock`. * parts_writing_lock is only needed for cases when you do not want to take `table_structure_lock` for long operations (ALTER MODIFY). */ - mutable std::shared_mutex structure_lock; + mutable RWLockFIFOPtr structure_lock = RWLockFIFO::create(); }; /// table name -> table diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index b68a0207c6..f914da23ff 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -77,7 +77,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body try { - auto storage_lock = owned_storage->lockStructure(false); + auto storage_lock = owned_storage->lockStructure(false, __PRETTY_FUNCTION__); MergeTreeData::DataPartPtr part; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 766a9c4f3c..8de62e1885 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -82,7 +82,7 @@ void ReplicatedMergeTreeAlterThread::run() LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); - auto table_lock = storage.lockStructureForAlter(); + auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__); const auto columns_changed = columns != storage.data.getColumnsListNonMaterialized(); const auto materialized_columns_changed = materialized_columns != storage.data.materialized_columns; @@ -140,7 +140,7 @@ void ReplicatedMergeTreeAlterThread::run() /// Update parts. if (changed_version || force_recheck_parts) { - auto table_lock = storage.lockStructure(false); + auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__); if (changed_version) LOG_INFO(log, "ALTER-ing parts"); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index bb7ae636b7..e17c87e030 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -218,7 +218,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) else if (part->name == part_name) { auto zookeeper = storage.getZooKeeper(); - auto table_lock = storage.lockStructure(false); + auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__); /// If the part is in ZooKeeper, check its data with its checksums, and them with ZooKeeper. if (zookeeper->exists(storage.replica_path + "/parts/" + part_name)) diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp index 7d3dd4aa6c..73f0da0912 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp @@ -143,7 +143,7 @@ bool Client::send(const std::string & part_name, size_t shard_no, LOG_TRACE(log, "Sending part " << part_name); - auto storage_lock = storage.lockStructure(false); + auto storage_lock = storage.lockStructure(false, __PRETTY_FUNCTION__); MergeTreeData::DataPartPtr part = findShardedPart(part_name, shard_no); diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 90ffaab12a..e8241fec0a 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -592,7 +592,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_ if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED); - auto lock = lockStructureForAlter(); + auto lock = lockStructureForAlter(__PRETTY_FUNCTION__); /// So that no blocks of the old structure remain. optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context.getSettings()); diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 7b75cb5449..f389c57f8b 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -246,7 +246,7 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED); - auto lock = lockStructureForAlter(); + auto lock = lockStructureForAlter(__PRETTY_FUNCTION__); params.apply(*columns, materialized_columns, alias_columns, column_defaults); context.getDatabase(database_name)->alterTable( diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 5d1502daed..d5a33ba132 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -335,7 +335,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const { auto & table = iterator->table(); if (table.get() != this) - selected_tables.emplace_back(table, table->lockStructure(false)); + selected_tables.emplace_back(table, table->lockStructure(false, __PRETTY_FUNCTION__)); } iterator->next(); @@ -351,7 +351,7 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED); - auto lock = lockStructureForAlter(); + auto lock = lockStructureForAlter(__PRETTY_FUNCTION__); params.apply(*columns, materialized_columns, alias_columns, column_defaults); context.getDatabase(database_name)->alterTable( diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4525dd667d..41caba6cf1 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -151,7 +151,7 @@ void StorageMergeTree::alter( /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. auto merge_blocker = merger.cancel(); - auto table_soft_lock = lockDataForAlter(); + auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__); data.checkAlter(params); @@ -194,7 +194,7 @@ void StorageMergeTree::alter( transactions.push_back(std::move(transaction)); } - auto table_hard_lock = lockStructureForAlter(); + auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__); IDatabase::ASTModifier engine_modifier; if (primary_key_is_modified) @@ -293,7 +293,7 @@ bool StorageMergeTree::merge( data.clearOldTemporaryDirectories(); } - auto structure_lock = lockStructure(true); + auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__); size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); @@ -402,8 +402,8 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & query, const Field /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = merger.cancel(); - auto lock_read_structure = lockStructure(false); - auto lock_write_data = lockDataForAlter(); + /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function + auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__); String partition_id = data.getPartitionIDFromQuery(partition); MergeTreeData::DataParts parts = data.getDataParts(); @@ -452,7 +452,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const Field & partiti /// This protects against "revival" of data for a removed partition after completion of merge. auto merge_blocker = merger.cancel(); /// Waits for completion of merge and does not start new ones. - auto lock = lockForAlter(); + auto lock = lockForAlter(__PRETTY_FUNCTION__); String partition_id = data.getPartitionIDFromQuery(partition); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c2b5280884..ef5728b9f0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1065,7 +1065,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) /// Can throw an exception. DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge); - auto table_lock = lockStructure(false); + auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts); MergeTreeData::Transaction transaction; @@ -1430,7 +1430,7 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry & /// We don't change table structure, only data in some parts /// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart() /// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238) - auto lock_read_structure = lockStructure(false); + auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__); auto zookeeper = getZooKeeper(); @@ -2107,7 +2107,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin TableStructureReadLockPtr table_lock; if (!to_detached) - table_lock = lockStructure(true); + table_lock = lockStructure(true, __PRETTY_FUNCTION__); ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); @@ -2408,7 +2408,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, { /// Just to read current structure. Alter will be done in separate thread. - auto table_lock = lockStructure(false); + auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); if (is_readonly) throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); @@ -3788,7 +3788,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_) Logger * log = log_ ? log_ : this->log; - auto table_lock = lockStructure(false); + auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); auto zookeeper = getZooKeeper(); MergeTreeData::DataPartsVector parts = data.grabOldParts(); diff --git a/dbms/src/Storages/StorageTrivialBuffer.cpp b/dbms/src/Storages/StorageTrivialBuffer.cpp index e205905865..9dafe30276 100644 --- a/dbms/src/Storages/StorageTrivialBuffer.cpp +++ b/dbms/src/Storages/StorageTrivialBuffer.cpp @@ -536,7 +536,7 @@ void StorageTrivialBuffer::alter( throw Exception("Storage engine " + getName() + " doesn't support primary key.", ErrorCodes::NOT_IMPLEMENTED); - auto lock = lockStructureForAlter(); + auto lock = lockStructureForAlter(__PRETTY_FUNCTION__); /// To avoid presence of blocks of different structure in the buffer. flush(false); diff --git a/dbms/src/Storages/System/StorageSystemColumns.cpp b/dbms/src/Storages/System/StorageSystemColumns.cpp index 79e11be949..b0df8444fc 100644 --- a/dbms/src/Storages/System/StorageSystemColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemColumns.cpp @@ -130,7 +130,7 @@ BlockInputStreams StorageSystemColumns::read( try { - table_lock = storage->lockStructure(false); + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); } catch (const Exception & e) { diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index a93b1a0872..3b053aed16 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -163,7 +163,7 @@ BlockInputStreams StorageSystemParts::read( try { - table_lock = storage->lockStructure(false); /// For table not to be dropped. + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); /// For table not to be dropped. } catch (const Exception & e) { -- GitLab