提交 7a5d532c 编写于 作者: A Alexander Tokmakov

implement rename database for atomic

上级 d1be5ec6
......@@ -34,32 +34,13 @@ public:
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, Context & context_)
: DatabaseOrdinary(name_, std::move(metadata_path_), "store/", "DatabaseAtomic (" + name_ + ")", context_)
, path_to_table_symlinks(context_.getPath() + "data/" + escapeForFileName(name_) + "/")
, path_to_table_symlinks(global_context.getPath() + "data/" + escapeForFileName(name_) + "/")
, path_to_metadata_symlink(global_context.getPath() + "metadata/" + escapeForFileName(name_))
, db_uuid(uuid)
{
assert(db_uuid != UUIDHelpers::Nil);
/// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse,
/// it's needed only for convenient introspection.
Poco::File(path_to_table_symlinks).createDirectories();
assert(path_to_metadata_symlink != metadata_path);
Poco::File metadata_symlink(path_to_metadata_symlink);
if (metadata_symlink.exists())
{
if (!metadata_symlink.isLink())
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} exists", path_to_metadata_symlink);
}
else
{
try
{
Poco::File{metadata_path}.linkTo(path_to_metadata_symlink, Poco::File::LINK_SYMBOLIC);
}
catch (...)
{
tryLogCurrentException(log);
}
}
tryCreateMetadataSymlink();
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
......@@ -67,7 +48,7 @@ String DatabaseAtomic::getTableDataPath(const String & table_name) const
std::lock_guard lock(mutex);
auto it = table_name_to_path.find(table_name);
if (it == table_name_to_path.end())
throw Exception("Table " + table_name + " not found in database " + getDatabaseName(), ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + table_name + " not found in database " + database_name, ErrorCodes::UNKNOWN_TABLE);
assert(it->second != data_path && !it->second.empty());
return it->second;
}
......@@ -88,7 +69,7 @@ void DatabaseAtomic::drop(const Context &)
}
catch (...)
{
tryLogCurrentException(log);
LOG_WARNING(log, getCurrentExceptionMessage(true));
}
Poco::File(getMetadataPath()).remove(true);
}
......@@ -99,7 +80,14 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
DetachedTables not_in_use;
std::unique_lock lock(mutex);
not_in_use = cleenupDetachedTables();
assertDetachedTableNotInUse(table->getStorageID().uuid);
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
if (table_id.database_name != database_name)
{
/// Update name if RENAME DATABASE happend during attach
table_id.database_name = database_name;
table->renameInMemory(table_id);
}
DatabaseWithDictionaries::attachTableUnlocked(name, table, lock);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
......@@ -217,9 +205,9 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
if (exchange)
other_table_data_path = detach(other_db, to_table_name);
table->renameInMemory({other_db.getDatabaseName(), to_table_name, table->getStorageID().uuid});
table->renameInMemory({other_db.database_name, to_table_name, table->getStorageID().uuid});
if (exchange)
other_table->renameInMemory({getDatabaseName(), table_name, other_table->getStorageID().uuid});
other_table->renameInMemory({database_name, table_name, other_table->getStorageID().uuid});
if (!inside_database)
{
......@@ -375,5 +363,60 @@ void DatabaseAtomic::tryRemoveSymlink(const String & table_name)
}
}
void DatabaseAtomic::tryCreateMetadataSymlink()
{
/// Symlinks in data/db_name/ directory and metadata/db_name/ are not used by ClickHouse,
/// it's needed only for convenient introspection.
assert(path_to_metadata_symlink != metadata_path);
Poco::File metadata_symlink(path_to_metadata_symlink);
if (metadata_symlink.exists())
{
if (!metadata_symlink.isLink())
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Directory {} exists", path_to_metadata_symlink);
}
else
{
try
{
Poco::File{metadata_path}.linkTo(path_to_metadata_symlink, Poco::File::LINK_SYMBOLIC);
}
catch (...)
{
tryLogCurrentException(log);
}
}
}
void DatabaseAtomic::renameDatabase(const String & new_name)
{
try
{
Poco::File(path_to_metadata_symlink).remove();
}
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessage(true));
}
{
std::lock_guard lock(mutex);
database_name = new_name;
for (auto & table : tables)
{
auto table_id = table.second->getStorageID();
table_id.database_name = database_name;
table.second->renameInMemory(table_id);
}
path_to_metadata_symlink = global_context.getPath() + "metadata/" + escapeForFileName(database_name);
String old_path_to_table_symlinks = path_to_table_symlinks;
path_to_table_symlinks = global_context.getPath() + "data/" + escapeForFileName(database_name) + "/";
Poco::File(old_path_to_table_symlinks).renameTo(path_to_table_symlinks);
}
tryCreateMetadataSymlink();
}
}
......@@ -26,6 +26,8 @@ public:
String getEngineName() const override { return "Atomic"; }
UUID getUUID() const override { return db_uuid; }
void renameDatabase(const String & new_name) override;
void renameTable(
const Context & context,
const String & table_name,
......@@ -64,13 +66,15 @@ private:
typedef std::unordered_map<UUID, StoragePtr> DetachedTables;
[[nodiscard]] DetachedTables cleenupDetachedTables();
void tryCreateMetadataSymlink();
//TODO store path in DatabaseWithOwnTables::tables
typedef std::unordered_map<String, String> NameToPathMap;
NameToPathMap table_name_to_path;
DetachedTables detached_tables;
const String path_to_table_symlinks;
const String path_to_metadata_symlink;
String path_to_table_symlinks;
String path_to_metadata_symlink;
const UUID db_uuid;
};
......
......@@ -51,9 +51,10 @@ Tables DatabaseDictionary::listTables(const FilterByNameFunction & filter_by_nam
{
Tables tables;
auto load_results = global_context.getExternalDictionariesLoader().getLoadResults(filter_by_name);
String db_name = getDatabaseName();
for (auto & load_result : load_results)
{
auto storage = createStorageDictionary(getDatabaseName(), load_result);
auto storage = createStorageDictionary(db_name, load_result);
if (storage)
tables.emplace(storage->getStorageID().table_name, storage);
}
......@@ -73,7 +74,7 @@ StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, const Cont
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
{
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name));
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name), getDatabaseName());
}
bool DatabaseDictionary::empty() const
......@@ -96,7 +97,7 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const String & table_name, co
}
auto names_and_types = StorageDictionary::getNamesAndTypes(ExternalDictionariesLoader::getDictionaryStructure(*load_result.config));
buffer << "CREATE TABLE " << backQuoteIfNeed(database_name) << '.' << backQuoteIfNeed(table_name) << " (";
buffer << "CREATE TABLE " << backQuoteIfNeed(getDatabaseName()) << '.' << backQuoteIfNeed(table_name) << " (";
buffer << StorageDictionary::generateNamesAndTypesDescription(names_and_types);
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";
}
......@@ -119,7 +120,7 @@ ASTPtr DatabaseDictionary::getCreateDatabaseQuery() const
String query;
{
WriteBufferFromString buffer(query);
buffer << "CREATE DATABASE " << backQuoteIfNeed(database_name) << " ENGINE = Dictionary";
buffer << "CREATE DATABASE " << backQuoteIfNeed(getDatabaseName()) << " ENGINE = Dictionary";
}
auto settings = global_context.getSettingsRef();
ParserCreateQuery parser;
......
......@@ -47,8 +47,6 @@ protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const override;
private:
mutable std::mutex mutex;
Poco::Logger * log;
const Context & global_context;
......
......@@ -91,7 +91,7 @@ time_t DatabaseLazy::getObjectMetadataModificationTime(const String & table_name
auto it = tables_cache.find(table_name);
if (it != tables_cache.end())
return it->second.metadata_modification_time;
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
void DatabaseLazy::alterTable(
......@@ -160,7 +160,7 @@ void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & tab
std::forward_as_tuple(table_name),
std::forward_as_tuple(table, current_time, DatabaseOnDisk::getObjectMetadataModificationTime(table_name)));
if (!inserted)
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), current_time, table_name);
}
......@@ -173,7 +173,7 @@ StoragePtr DatabaseLazy::detachTable(const String & table_name)
std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name);
if (it == tables_cache.end())
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second.table;
if (it->second.expiration_iterator != cache_expiration_queue.end())
cache_expiration_queue.erase(it->second.expiration_iterator);
......@@ -230,7 +230,7 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
{
const auto & ast_create = ast->as<const ASTCreateQuery &>();
String table_data_path_relative = getTableDataPath(ast_create);
table = createTableFromAST(ast_create, database_name, table_data_path_relative, context_copy, false).second;
table = createTableFromAST(ast_create, getDatabaseName(), table_data_path_relative, context_copy, false).second;
}
if (!ast || !endsWith(table->getName(), "Log"))
......@@ -239,7 +239,7 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name);
if (it == tables_cache.end())
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
if (it->second.expiration_iterator != cache_expiration_queue.end())
cache_expiration_queue.erase(it->second.expiration_iterator);
......@@ -299,6 +299,7 @@ DatabaseLazyIterator::DatabaseLazyIterator(DatabaseLazy & database_, Strings &&
, iterator(table_names.begin())
, current_storage(nullptr)
{
database_name = database.database_name;
}
void DatabaseLazyIterator::next()
......
......@@ -58,7 +58,7 @@ void DatabaseMemory::dropTable(
ASTPtr DatabaseMemory::getCreateDatabaseQuery() const
{
auto create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->database = getDatabaseName();
create_query->set(create_query->storage, std::make_shared<ASTStorage>());
create_query->storage->set(create_query->storage->engine, makeASTFunction(getEngineName()));
return create_query;
......
......@@ -100,7 +100,7 @@ DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, cons
if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
tables[table_name] = modify_time_and_storage.second;
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
}
bool DatabaseMySQL::isTableExist(const String & name, const Context &) const
......@@ -188,7 +188,7 @@ time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_nam
ASTPtr DatabaseMySQL::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->database = getDatabaseName();
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
......@@ -379,11 +379,11 @@ void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & st
std::lock_guard<std::mutex> lock{mutex};
if (!local_tables_cache.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
throw Exception("Cannot attach table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) +
" because it does not exist.", ErrorCodes::UNKNOWN_TABLE);
if (!remove_or_detach_tables.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
throw Exception("Cannot attach table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) +
" because it already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
/// We use the new storage to replace the original storage, because the original storage may have been dropped
......@@ -402,11 +402,11 @@ StoragePtr DatabaseMySQL::detachTable(const String & table_name)
std::lock_guard<std::mutex> lock{mutex};
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
throw Exception("Table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (!local_tables_cache.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
throw Exception("Table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
......@@ -442,16 +442,16 @@ void DatabaseMySQL::dropTable(const Context &, const String & table_name, bool /
Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
throw Exception("Table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (remove_flag.exists())
throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(getDatabaseName()) +
throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(database_name) +
"." + backQuoteIfNeed(table_name) + " does not exists remove tables, it is bug.", ErrorCodes::LOGICAL_ERROR);
auto table_iter = local_tables_cache.find(table_name);
if (table_iter == local_tables_cache.end())
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
throw Exception("Table " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
......
......@@ -67,7 +67,6 @@ private:
ASTPtr database_engine_define;
String database_name_in_mysql;
mutable std::mutex mutex;
std::atomic<bool> quit{false};
std::condition_variable cond;
......
......@@ -146,7 +146,7 @@ void DatabaseOnDisk::createTable(
{
const auto & settings = context.getSettingsRef();
const auto & create = query->as<ASTCreateQuery &>();
assert(getDatabaseName() == create.database && table_name == create.table);
assert(table_name == create.table);
/// Create a file with metadata if necessary - if the query is not ATTACH.
/// Write the query of `ATTACH table` to it.
......@@ -341,8 +341,14 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
ASTPtr ast;
auto settings = global_context.getSettingsRef();
auto database_metadata_path = global_context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql";
ast = getCreateQueryFromMetadata(database_metadata_path, true);
{
std::lock_guard lock(mutex);
auto database_metadata_path = global_context.getPath() + "metadata/" + escapeForFileName(database_name) + ".sql";
ast = parseQueryFromMetadata(log, global_context, database_metadata_path, true);
auto & ast_create_query = ast->as<ASTCreateQuery &>();
ast_create_query.attach = false;
ast_create_query.database = database_name;
}
if (!ast)
{
/// Handle databases (such as default) for which there are no database.sql files.
......@@ -493,7 +499,7 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromMetadata(const String & database_metada
{
auto & ast_create_query = ast->as<ASTCreateQuery &>();
ast_create_query.attach = false;
ast_create_query.database = database_name;
ast_create_query.database = getDatabaseName();
}
return ast;
......
......@@ -163,7 +163,7 @@ void DatabaseOrdinary::loadStoredObjects(Context & context, bool has_force_resto
context,
create_query,
*this,
getDatabaseName(),
database_name,
getMetadataPath() + name_with_query.first,
has_force_restore_data_flag);
......
......@@ -47,7 +47,7 @@ void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name,
attachTableUnlocked(
dictionary_name,
StorageDictionary::create(
StorageID(getDatabaseName(), dictionary_name),
StorageID(database_name, dictionary_name),
full_name,
ExternalDictionariesLoader::getDictionaryStructure(*attach_info.config)),
lock);
......@@ -74,7 +74,7 @@ void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name)
void DatabaseWithDictionaries::detachDictionaryImpl(const String & dictionary_name, DictionaryAttachInfo & attach_info)
{
String full_name = getDatabaseName() + "." + dictionary_name;
String full_name = getDatabaseName() + "." + dictionary_name; //FIXME
{
std::unique_lock lock(mutex);
......@@ -192,7 +192,7 @@ void DatabaseWithDictionaries::removeDictionary(const Context &, const String &
{
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
Poco::File(dictionary_metadata_path).remove();
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, getDatabaseName() + "." + dictionary_name);
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, getDatabaseName() + "." + dictionary_name); //FIXME
}
catch (...)
{
......@@ -234,7 +234,7 @@ ASTPtr DatabaseWithDictionaries::getCreateDictionaryQueryImpl(
ASTPtr ast = it->second.create_query->clone();
auto & create_query = ast->as<ASTCreateQuery &>();
create_query.attach = false;
create_query.database = getDatabaseName();
create_query.database = database_name;
return ast;
}
}
......
......@@ -43,14 +43,14 @@ DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const Con
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
Tables filtered_tables;
for (const auto & [table_name, storage] : tables)
if (filter_by_table_name(table_name))
filtered_tables.emplace(table_name, storage);
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables));
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables), database_name);
}
bool DatabaseWithOwnTablesBase::empty() const
......@@ -78,7 +78,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
auto table_id = res->getStorageID();
if (table_id.hasUUID())
{
assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
}
......@@ -98,7 +98,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
auto table_id = table->getStorageID();
if (table_id.hasUUID())
{
assert(getDatabaseName() == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
assert(database_name == DatabaseCatalog::TEMPORARY_DATABASE || getEngineName() == "Atomic");
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table);
}
}
......
......@@ -36,7 +36,6 @@ public:
~DatabaseWithOwnTablesBase() override;
protected:
mutable std::mutex mutex;
Tables tables;
Poco::Logger * log;
const Context & global_context;
......
......@@ -48,6 +48,11 @@ public:
virtual ~IDatabaseTablesIterator() = default;
virtual UUID uuid() const { return UUIDHelpers::Nil; }
const String & databaseName() const { assert(!database_name.empty()); return database_name; }
protected:
String database_name;
};
/// Copies list of tables and iterates through such snapshot.
......@@ -65,12 +70,21 @@ protected:
other.it = other.tables.end();
it = tables.begin();
std::advance(it, idx);
database_name = std::move(other.database_name);
}
public:
DatabaseTablesSnapshotIterator(Tables & tables_) : tables(tables_), it(tables.begin()) {}
DatabaseTablesSnapshotIterator(const Tables & tables_, const String & database_name_)
: tables(tables_), it(tables.begin())
{
database_name = database_name_;
}
DatabaseTablesSnapshotIterator(Tables && tables_) : tables(tables_), it(tables.begin()) {}
DatabaseTablesSnapshotIterator(Tables && tables_, String && database_name_)
: tables(std::move(tables_)), it(tables.begin())
{
database_name = std::move(database_name_);
}
void next() override { ++it; }
......@@ -282,10 +296,19 @@ public:
virtual ASTPtr getCreateDatabaseQuery() const = 0;
/// Get name of database.
String getDatabaseName() const { return database_name; }
String getDatabaseName() const
{
std::lock_guard lock{mutex};
return database_name;
}
/// Get UUID of database.
virtual UUID getUUID() const { return UUIDHelpers::Nil; }
virtual void renameDatabase(const String & /*new_name*/)
{
throw Exception(getEngineName() + ": RENAME DATABASE is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns path for persistent data storage if the database supports it, empty string otherwise
virtual String getDataPath() const { return {}; }
......@@ -324,6 +347,7 @@ protected:
return nullptr;
}
mutable std::mutex mutex;
String database_name;
};
......
......@@ -12,6 +12,7 @@
#include <Parsers/formatAST.h>
#include <IO/ReadHelpers.h>
#include <Poco/DirectoryIterator.h>
#include <Common/renameat2.h>
#include <filesystem>
......@@ -299,6 +300,38 @@ DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool d
return db;
}
void DatabaseCatalog::renameDatabase(const String & old_name, const String & new_name)
{
std::lock_guard lock{databases_mutex};
assertDatabaseExistsUnlocked(old_name);
assertDatabaseDoesntExistUnlocked(new_name);
auto it = databases.find(old_name);
auto db = it->second;
db->renameDatabase(new_name);
databases.erase(it);
databases.emplace(new_name, db);
auto depend_it = view_dependencies.begin();
while (depend_it != view_dependencies.end())
{
if (depend_it->first.database_name == old_name)
{
auto table_id = depend_it->first;
auto dependencies = std::move(depend_it->second);
depend_it = view_dependencies.erase(depend_it);
table_id.database_name = new_name;
view_dependencies.emplace(std::move(table_id), std::move(dependencies));
}
else
++depend_it;
}
auto old_database_metadata_path = global_context->getPath() + "metadata/" + escapeForFileName(old_name) + ".sql";
auto new_database_metadata_path = global_context->getPath() + "metadata/" + escapeForFileName(new_name) + ".sql";
renameNoReplace(old_database_metadata_path, new_database_metadata_path);
}
DatabasePtr DatabaseCatalog::getDatabase(const String & database_name) const
{
std::lock_guard lock{databases_mutex};
......
......@@ -123,6 +123,7 @@ public:
void attachDatabase(const String & database_name, const UUID & uuid, const DatabasePtr & database);
DatabasePtr detachDatabase(const String & database_name, bool drop = false, bool check_empty = true);
void renameDatabase(const String & old_name, const String & new_name);
/// database_name must be not empty
DatabasePtr getDatabase(const String & database_name) const;
......
......@@ -18,23 +18,6 @@ InterpreterRenameQuery::InterpreterRenameQuery(const ASTPtr & query_ptr_, Contex
}
struct RenameDescription
{
RenameDescription(const ASTRenameQuery::Element & elem, const String & current_database) :
from_database_name(elem.from.database.empty() ? current_database : elem.from.database),
from_table_name(elem.from.table),
to_database_name(elem.to.database.empty() ? current_database : elem.to.database),
to_table_name(elem.to.table)
{}
String from_database_name;
String from_table_name;
String to_database_name;
String to_table_name;
};
BlockIO InterpreterRenameQuery::execute()
{
const auto & rename = query_ptr->as<const ASTRenameQuery &>();
......@@ -51,7 +34,7 @@ BlockIO InterpreterRenameQuery::execute()
* or we will be in inconsistent state. (It is worth to be fixed.)
*/
std::vector<RenameDescription> descriptions;
RenameDescriptions descriptions;
descriptions.reserve(rename.elements.size());
/// Don't allow to drop tables (that we are renaming); don't allow to create tables in places where tables will be renamed.
......@@ -75,22 +58,43 @@ BlockIO InterpreterRenameQuery::execute()
for (auto & table_guard : table_guards)
table_guard.second = database_catalog.getDDLGuard(table_guard.first.database_name, table_guard.first.table_name);
if (rename.database)
return executeToDatabase(rename, descriptions);
else
return executeToTables(rename, descriptions);
}
BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, const RenameDescriptions & descriptions)
{
auto & database_catalog = DatabaseCatalog::instance();
for (auto & elem : descriptions)
{
if (!rename.exchange)
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context);
database_catalog.getDatabase(elem.from_database_name)->renameTable(
context,
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),
elem.to_table_name,
rename.exchange);
context,
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),
elem.to_table_name,
rename.exchange);
}
return {};
}
BlockIO InterpreterRenameQuery::executeToDatabase(const ASTRenameQuery &, const RenameDescriptions & descriptions)
{
assert(descriptions.size() == 1);
assert(descriptions.front().from_table_name.empty());
assert(descriptions.front().to_table_name.empty());
const auto & old_name = descriptions.front().from_database_name;
const auto & new_name = descriptions.back().to_database_name;
DatabaseCatalog::instance().renameDatabase(old_name, new_name);
return {};
}
AccessRightsElements InterpreterRenameQuery::getRequiredAccess() const
{
AccessRightsElements required_access;
......@@ -99,6 +103,12 @@ AccessRightsElements InterpreterRenameQuery::getRequiredAccess() const
{
required_access.emplace_back(AccessType::SELECT | AccessType::DROP_TABLE, elem.from.database, elem.from.table);
required_access.emplace_back(AccessType::CREATE_TABLE | AccessType::INSERT, elem.to.database, elem.to.table);
if (rename.exchange)
{
required_access.emplace_back(AccessType::CREATE_TABLE | AccessType::INSERT, elem.from.database, elem.from.table);
required_access.emplace_back(AccessType::SELECT | AccessType::DROP_TABLE, elem.to.database, elem.to.table);
}
}
return required_access;
}
......
#pragma once
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTRenameQuery.h>
namespace DB
......@@ -25,6 +25,24 @@ struct UniqueTableName
}
};
struct RenameDescription
{
RenameDescription(const ASTRenameQuery::Element & elem, const String & current_database) :
from_database_name(elem.from.database.empty() ? current_database : elem.from.database),
from_table_name(elem.from.table),
to_database_name(elem.to.database.empty() ? current_database : elem.to.database),
to_table_name(elem.to.table)
{}
String from_database_name;
String from_table_name;
String to_database_name;
String to_table_name;
};
using RenameDescriptions = std::vector<RenameDescription>;
using TableGuards = std::map<UniqueTableName, std::unique_ptr<DDLGuard>>;
/** Rename one table
......@@ -37,6 +55,9 @@ public:
BlockIO execute() override;
private:
BlockIO executeToTables(const ASTRenameQuery & rename, const RenameDescriptions & descriptions);
BlockIO executeToDatabase(const ASTRenameQuery & rename, const RenameDescriptions & descriptions);
AccessRightsElements getRequiredAccess() const;
ASTPtr query_ptr;
......
......@@ -384,7 +384,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
if (auto table = iterator->table())
{
if (dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
replica_names.emplace_back(StorageID{database->getDatabaseName(), iterator->name()});
replica_names.emplace_back(StorageID{iterator->databaseName(), iterator->name()});
}
}
}
......
......@@ -30,6 +30,7 @@ public:
Elements elements;
bool exchange{false}; /// For EXCHANGE TABLES
bool database{false}; /// For RENAME DATABASE
/** Get the text that identifies this element. */
String getID(char) const override { return "Rename"; }
......@@ -61,6 +62,16 @@ public:
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override
{
if (database)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "RENAME DATABASE " << (settings.hilite ? hilite_none : "");
settings.ostr << backQuoteIfNeed(elements.at(0).from.database);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "");
settings.ostr << backQuoteIfNeed(elements.at(0).to.database);
formatOnCluster(settings);
return;
}
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< (exchange ? "EXCHANGE TABLES " : "RENAME TABLE ") << (settings.hilite ? hilite_none : "");
......
......@@ -40,8 +40,9 @@ static bool parseDatabaseAndTable(
bool ParserRenameQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_rename_table("RENAME TABLE");
ParserKeyword s_to("TO");
ParserKeyword s_exchange_tables("EXCHANGE TABLES");
ParserKeyword s_rename_database("RENAME DATABASE");
ParserKeyword s_to("TO");
ParserKeyword s_and("AND");
ParserToken s_comma(TokenType::Comma);
......@@ -51,6 +52,34 @@ bool ParserRenameQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
if (s_exchange_tables.ignore(pos, expected))
exchange = true;
else if (s_rename_database.ignore(pos, expected))
{
ASTPtr from_db;
ASTPtr to_db;
ParserIdentifier db_name_p;
if (!db_name_p.parse(pos, from_db, expected))
return false;
if (!s_to.ignore(pos, expected))
return false;
if (!db_name_p.parse(pos, to_db, expected))
return false;
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
auto query = std::make_shared<ASTRenameQuery>();
query->database = true;
query->elements.emplace({});
tryGetIdentifierNameInto(from_db, query->elements.front().from.database);
tryGetIdentifierNameInto(to_db, query->elements.front().to.database);
query->cluster = cluster_str;
node = query;
return true;
}
else
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册