未验证 提交 71d30098 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #9986 from ClickHouse/revert-9907-remove-useless-code-locks

Revert "Remove useless code around locks"
......@@ -25,7 +25,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* Although now any insertion into the table is done via PushingToViewsBlockOutputStream,
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(storage->lockStructureForShare(context.getInitialQueryId()));
addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId()));
/// If the "root" table deduplactes blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
......@@ -54,7 +54,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(materialized_view->lockStructureForShare(context.getInitialQueryId()));
addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
......
......@@ -358,7 +358,7 @@ void DatabaseMySQL::cleanOutdatedTables()
++iterator;
else
{
const auto table_lock = (*iterator)->lockAlterIntention();
const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
(*iterator)->shutdown();
(*iterator)->is_dropped = true;
......
......@@ -65,7 +65,7 @@ FunctionBaseImplPtr JoinGetOverloadResolver::build(const ColumnsWithTypeAndName
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size());
auto table_lock = storage_join->lockStructureForShare(context.getInitialQueryId());
auto table_lock = storage_join->lockStructureForShare(false, context.getInitialQueryId());
for (size_t i = 0; i < arguments.size(); ++i)
data_types[i] = arguments[i].type;
......
......@@ -82,7 +82,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!mutation_commands.empty())
{
auto table_lock_holder = table->lockStructureForShare(context.getCurrentQueryId());
auto table_lock_holder = table->lockStructureForShare(false /* because mutation is executed asyncronously */, context.getCurrentQueryId());
MutationsInterpreter(table, mutation_commands, context, false).validate(table_lock_holder);
table->mutate(mutation_commands, context);
}
......@@ -101,7 +101,7 @@ BlockIO InterpreterAlterQuery::execute()
switch (command.type)
{
case LiveViewCommand::REFRESH:
live_view->refresh();
live_view->refresh(context);
break;
}
}
......@@ -109,7 +109,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention();
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata);
......
......@@ -403,7 +403,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table});
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(context.getCurrentQueryId());
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.
......
......@@ -89,7 +89,7 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table = DatabaseCatalog::instance().getTable(table_id);
}
auto table_lock = table->lockStructureForShare(context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(false, context.getInitialQueryId());
columns = table->getColumns();
}
......
......@@ -109,7 +109,7 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;
StoragePtr table = getTable(query);
auto table_lock = table->lockStructureForShare(context.getInitialQueryId());
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
auto query_sample_block = getSampleBlock(query, table);
if (!query.table_function)
......
......@@ -255,7 +255,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
table_lock = storage->lockStructureForShare(context->getInitialQueryId());
table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
table_id = storage->getStorageID();
}
......
......@@ -314,9 +314,11 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(const String & query_id)
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockHolder result;
if (will_add_new_data)
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Read, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Read, query_id);
if (is_dropped)
......@@ -324,10 +326,10 @@ TableStructureReadLockHolder IStorage::lockStructureForShare(const String & quer
return result;
}
TableStructureWriteLockHolder IStorage::lockAlterIntention()
TableStructureWriteLockHolder IStorage::lockAlterIntention(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_lock = std::unique_lock(alter_lock);
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
......@@ -336,20 +338,23 @@ TableStructureWriteLockHolder IStorage::lockAlterIntention()
void IStorage::lockStructureExclusively(TableStructureWriteLockHolder & lock_holder, const String & query_id)
{
if (!lock_holder.alter_lock)
if (!lock_holder.alter_intention_lock)
throw Exception("Alter intention lock for table " + getStorageID().getNameForLogs() + " was not taken. This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!lock_holder.new_data_structure_lock)
lock_holder.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
lock_holder.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
}
TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
{
TableStructureWriteLockHolder result;
result.alter_lock = std::unique_lock(alter_lock);
result.alter_intention_lock = alter_intention_lock->getLock(RWLockImpl::Write, query_id);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
result.new_data_structure_lock = new_data_structure_lock->getLock(RWLockImpl::Write, query_id);
result.structure_lock = structure_lock->getLock(RWLockImpl::Write, query_id);
return result;
......
......@@ -199,11 +199,11 @@ public:
/// Acquire this lock if you need the table structure to remain constant during the execution of
/// the query. If will_add_new_data is true, this means that the query will add new data to the table
/// (INSERT or a parts merge).
TableStructureReadLockHolder lockStructureForShare(const String & query_id);
TableStructureReadLockHolder lockStructureForShare(bool will_add_new_data, const String & query_id);
/// Acquire this lock at the start of ALTER to lock out other ALTERs and make sure that only you
/// can modify the table structure. It can later be upgraded to the exclusive lock.
TableStructureWriteLockHolder lockAlterIntention();
TableStructureWriteLockHolder lockAlterIntention(const String & query_id);
/// Upgrade alter intention lock to the full exclusive structure lock. This is done by ALTER queries
/// to ensure that no other query uses the table structure and it can be safely changed.
......@@ -490,7 +490,12 @@ private:
/// If you hold this lock exclusively, you can be sure that no other structure modifying queries
/// (e.g. ALTER, DROP) are concurrently executing. But queries that only read table structure
/// (e.g. SELECT, INSERT) can continue to execute.
mutable std::mutex alter_lock;
mutable RWLock alter_intention_lock = RWLockImpl::create();
/// It is taken for share for the entire INSERT query and the entire merge of the parts (for MergeTree).
/// ALTER COLUMN queries acquire an exclusive lock to ensure that no new parts with the old structure
/// are added to the table and thus the set of parts to modify doesn't change.
mutable RWLock new_data_structure_lock = RWLockImpl::create();
/// Lock for the table column structure (names, types, etc.) and data path.
/// It is taken in exclusive mode by queries that modify them (e.g. RENAME, ALTER and DROP)
......
......@@ -517,11 +517,14 @@ void StorageLiveView::drop(TableStructureWriteLockHolder &)
condition.notify_all();
}
void StorageLiveView::refresh()
void StorageLiveView::refresh(const Context & context)
{
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
{
std::lock_guard lock(mutex);
if (getNewBlocks())
condition.notify_all();
}
}
Pipes StorageLiveView::read(
......
......@@ -123,7 +123,7 @@ public:
void startup() override;
void shutdown() override;
void refresh();
void refresh(const Context & context);
Pipes read(
const Names & column_names,
......
......@@ -85,7 +85,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
try
{
auto storage_lock = data.lockStructureForShare(RWLockImpl::NO_QUERY);
auto storage_lock = data.lockStructureForShare(false, RWLockImpl::NO_QUERY);
MergeTreeData::DataPartPtr part = findPart(part_name);
......
......@@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare("");
auto lock = storage.lockStructureForShare(false, "");
storage.clearOldTemporaryDirectories();
}
......
......@@ -203,7 +203,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
else if (part->name == part_name)
{
auto zookeeper = storage.getZooKeeper();
auto table_lock = storage.lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->getColumns(), part->checksums);
......
......@@ -168,7 +168,7 @@ Pipes StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
auto destination_lock = destination->lockStructureForShare(context.getCurrentQueryId());
auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
{
......
......@@ -185,7 +185,7 @@ Pipes StorageMaterializedView::read(
const unsigned num_streams)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
......@@ -200,7 +200,7 @@ Pipes StorageMaterializedView::read(
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context)
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(context.getCurrentQueryId());
auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId());
auto stream = storage->write(query, context);
stream->addTableLock(lock);
return stream;
......
......@@ -364,7 +364,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
{
auto & table = iterator->table();
if (table.get() != this)
selected_tables.emplace_back(table, table->lockStructureForShare(query_id), iterator->name());
selected_tables.emplace_back(table, table->lockStructureForShare(false, query_id), iterator->name());
iterator->next();
}
......@@ -389,7 +389,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr
if (storage.get() != this)
{
selected_tables.emplace_back(storage, storage->lockStructureForShare(query_id), iterator->name());
selected_tables.emplace_back(storage, storage->lockStructureForShare(false, query_id), iterator->name());
virtual_column->insert(iterator->name());
}
......
......@@ -241,7 +241,7 @@ void StorageMergeTree::alter(
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
/// We release all locks except alter_lock which allows
/// We release all locks except alter_intention_lock which allows
/// to execute alter queries sequentially
table_lock_holder.releaseAllExceptAlterIntention();
......@@ -537,7 +537,7 @@ bool StorageMergeTree::merge(
bool deduplicate,
String * out_disable_reason)
{
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
......@@ -655,7 +655,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
bool StorageMergeTree::tryMutatePart()
{
auto table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part;
......@@ -780,7 +780,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare("");
auto lock_structure = lockStructureForShare(false, "");
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
......@@ -973,14 +973,14 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(context.getCurrentQueryId());
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezeAll(command.with_name, context, lock);
}
break;
......@@ -1045,8 +1045,8 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
......@@ -1116,8 +1116,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto dest_table_storage = std::dynamic_pointer_cast<StorageMergeTree>(dest_table);
if (!dest_table_storage)
......
......@@ -1025,7 +1025,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name)
......@@ -1160,7 +1160,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
MutableDataPartPtr new_part;
Transaction transaction(*this);
......@@ -1514,7 +1514,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions parts_to_add;
DataPartsVector parts_to_remove;
auto table_lock_holder_dst_table = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock_holder_dst_table = lockStructureForShare(false, RWLockImpl::NO_QUERY);
for (size_t i = 0; i < entry_replace.new_part_names.size(); ++i)
{
......@@ -1576,7 +1576,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return 0;
}
table_lock_holder_src_table = source_table->lockStructureForShare(RWLockImpl::NO_QUERY);
table_lock_holder_src_table = source_table->lockStructureForShare(false, RWLockImpl::NO_QUERY);
DataPartStates valid_states{MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed,
MergeTreeDataPartState::Outdated};
......@@ -2699,7 +2699,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
TableStructureReadLockHolder table_lock_holder;
if (!to_detached)
table_lock_holder = lockStructureForShare(RWLockImpl::NO_QUERY);
table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
/// Logging
Stopwatch stopwatch;
......@@ -3223,7 +3223,7 @@ void StorageReplicatedMergeTree::alter(
alter_entry.emplace();
mutation_znode.reset();
/// We can safely read structure, because we guarded with alter_lock
/// We can safely read structure, because we guarded with alter_intention_lock
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
......@@ -3428,14 +3428,14 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(query_context.getCurrentQueryId());
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezeAll(command.with_name, query_context, lock);
}
break;
......@@ -4443,7 +4443,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{
/// Critical section is not required (since grabOldParts() returns unique part set on each call)
auto table_lock = lockStructureForShare(RWLockImpl::NO_QUERY);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
DataPartsVector parts = grabOldParts();
......@@ -4738,8 +4738,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
const Context & context)
{
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(context.getCurrentQueryId());
auto lock1 = lockStructureForShare(true, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
......@@ -4917,8 +4917,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(context.getCurrentQueryId());
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
auto dest_table_storage = std::dynamic_pointer_cast<StorageReplicatedMergeTree>(dest_table);
if (!dest_table_storage)
......
......@@ -103,7 +103,7 @@ protected:
try
{
table_lock = storage->lockStructureForShare(query_id);
table_lock = storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{
......
......@@ -192,7 +192,7 @@ StoragesInfo StoragesInfoStream::next()
try
{
/// For table not to be dropped and set of columns to remain constant.
info.table_lock = info.storage->lockStructureForShare(query_id);
info.table_lock = info.storage->lockStructureForShare(false, query_id);
}
catch (const Exception & e)
{
......
......@@ -244,7 +244,7 @@ protected:
if (need_lock_structure)
{
table = tables_it->table();
lock = table->lockStructureForShare(context.getCurrentQueryId());
lock = table->lockStructureForShare(false, context.getCurrentQueryId());
}
}
catch (const Exception & e)
......
......@@ -12,11 +12,12 @@ struct TableStructureWriteLockHolder
{
void release()
{
*this = {};
*this = TableStructureWriteLockHolder();
}
void releaseAllExceptAlterIntention()
{
new_data_structure_lock.reset();
structure_lock.reset();
}
......@@ -24,7 +25,8 @@ private:
friend class IStorage;
/// Order is important.
std::unique_lock<std::mutex> alter_lock;
RWLockImpl::LockHolder alter_intention_lock;
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
......@@ -32,13 +34,14 @@ struct TableStructureReadLockHolder
{
void release()
{
*this = {};
*this = TableStructureReadLockHolder();
}
private:
friend class IStorage;
/// Order is important.
RWLockImpl::LockHolder new_data_structure_lock;
RWLockImpl::LockHolder structure_lock;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册