提交 713923e4 编写于 作者: Z zhang2014

Support detach and drop table for mysql database engine

上级 d894d1df
......@@ -474,6 +474,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_PRIVILEGES = 497;
extern const int LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED = 498;
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DATABASE = 500;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -9,6 +9,9 @@
#include <Parsers/ASTFunction.h>
#include <Common/parseAddress.h>
#include "config_core.h"
#include "DatabaseFactory.h"
#include <Poco/File.h>
#if USE_MYSQL
#include <Databases/DatabaseMySQL.h>
......@@ -21,15 +24,32 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DATABASE_ENGINE;
extern const int CANNOT_CREATE_DATABASE;
}
DatabasePtr DatabaseFactory::get(
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context)
const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context)
{
try
{
Poco::File(metadata_path).createDirectory();
return getImpl(database_name, metadata_path, engine_define, context);
}
catch (...)
{
Poco::File metadata_dir(metadata_path);
if (metadata_dir.exists())
metadata_dir.remove(true);
throw;
}
}
DatabasePtr DatabaseFactory::getImpl(
const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context)
{
String engine_name = engine_define->engine->name;
......@@ -55,20 +75,31 @@ DatabasePtr DatabaseFactory::get(
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
throw Exception(
"MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
throw Exception("MySQL Database require mysql_hostname, mysql_database_name, mysql_username, mysql_password arguments.",
ErrorCodes::BAD_ARGUMENTS);
const auto & arguments = engine->arguments->children;
const auto & mysql_host_name = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_database_name = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & host_name_and_port = arguments[0]->as<ASTLiteral>()->value.safeGet<String>();
const auto & database_name_in_mysql = arguments[1]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_name = arguments[2]->as<ASTLiteral>()->value.safeGet<String>();
const auto & mysql_user_password = arguments[3]->as<ASTLiteral>()->value.safeGet<String>();
auto parsed_host_port = parseAddress(mysql_host_name, 3306);
return std::make_shared<DatabaseMySQL>(context, database_name, parsed_host_port.first, parsed_host_port.second, mysql_database_name,
mysql_user_name, mysql_user_password);
try
{
const auto & [remote_host_name, remote_port] = parseAddress(host_name_and_port, 3306);
auto mysql_pool = mysqlxx::Pool(database_name_in_mysql, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
auto mysql_database = std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, database_name_in_mysql, std::move(mysql_pool));
mysql_database->empty(context); /// test database is works fine.
return mysql_database;
}
catch (...)
{
const auto & exception_message = getCurrentExceptionMessage(true);
throw Exception("Cannot create MySQL database, because " + exception_message, ErrorCodes::CANNOT_CREATE_DATABASE);
}
}
#endif
......
......@@ -11,11 +11,9 @@ class ASTStorage;
class DatabaseFactory
{
public:
static DatabasePtr get(
const String & database_name,
const String & metadata_path,
const ASTStorage * engine_define,
Context & context);
static DatabasePtr get(const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context);
static DatabasePtr getImpl(const String & database_name, const String & metadata_path, const ASTStorage * engine_define, Context & context);
};
}
#include "config_core.h"
#if USE_MYSQL
#include <string>
#include <Databases/DatabaseMySQL.h>
#include <Common/parseAddress.h>
#include <IO/Operators.h>
......@@ -12,12 +14,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Storages/StorageMySQL.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/setThreadName.h>
#include <Common/escapeForFileName.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCreateQuery.h>
#include <DataTypes/convertMySQLDataType.h>
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
namespace DB
{
......@@ -25,8 +34,13 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int TABLE_IS_DROPPED;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int TABLE_ALREADY_EXISTS;
extern const int UNEXPECTED_AST_STRUCTURE;
}
constexpr static const auto suffix = "_remove.flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
String toQueryStringWithQuote(const std::vector<String> & quote_list)
......@@ -47,11 +61,10 @@ String toQueryStringWithQuote(const std::vector<String> & quote_list)
}
DatabaseMySQL::DatabaseMySQL(
const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_)
: global_context(context_), database_name(database_name_), mysql_host_name(mysql_host_name_), mysql_port(mysql_port_),
mysql_database_name(mysql_database_name_), mysql_user_name(mysql_user_name_), mysql_user_password(mysql_user_password_),
mysql_pool(mysql_database_name, mysql_host_name, mysql_user_name, mysql_user_password, mysql_port)
const Context & global_context_, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
: global_context(global_context_), database_name(database_name_), metadata_path(metadata_path_),
database_engine_define(database_engine_define_->clone()), database_name_in_mysql(database_name_in_mysql_), mysql_pool(std::move(pool))
{
}
......@@ -61,7 +74,14 @@ bool DatabaseMySQL::empty(const Context &) const
fetchTablesIntoLocalCache();
return local_tables_cache.empty();
if (local_tables_cache.empty())
return true;
for (const auto & [table_name, storage_info] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name))
return false;
return true;
}
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
......@@ -71,9 +91,9 @@ DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, cons
fetchTablesIntoLocalCache();
for (const auto & local_table : local_tables_cache)
if (!filter_by_table_name || filter_by_table_name(local_table.first))
tables[local_table.first] = local_table.second.storage;
for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
tables[table_name] = modify_time_and_storage.second;
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
}
......@@ -89,12 +109,47 @@ StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_tabl
fetchTablesIntoLocalCache();
if (local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].storage;
if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].second;
return StoragePtr{};
}
static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & database_engine_define)
{
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_define->clone();
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
{
/// init create query.
create_table_query->table = storage->getTableName();
create_table_query->database = storage->getDatabaseName();
for (const auto & column_type_and_name : storage->getColumns().getOrdinary())
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = dataTypeConvertToQuery(column_type_and_name.type);
columns_expression_list->children.emplace_back(column_declaration);
}
auto mysql_table_name = std::make_shared<ASTLiteral>(storage->getTableName());
auto storage_engine_arguments = table_storage_define->as<ASTStorage>()->engine->arguments;
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
}
return create_table_query;
}
ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
......@@ -102,9 +157,9 @@ ASTPtr DatabaseMySQL::tryGetCreateTableQuery(const Context &, const String & tab
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return local_tables_cache[table_name].create_table_query;
return getCreateQueryFromStorage(local_tables_cache[table_name].second, database_engine_define);
}
time_t DatabaseMySQL::getObjectMetadataModificationTime(const Context &, const String & table_name)
......@@ -114,22 +169,16 @@ time_t DatabaseMySQL::getObjectMetadataModificationTime(const Context &, const S
fetchTablesIntoLocalCache();
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + mysql_database_name + "." + table_name + " doesn't exist..", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return time_t(local_tables_cache[table_name].modification_time);
return time_t(local_tables_cache[table_name].first);
}
ASTPtr DatabaseMySQL::getCreateDatabaseQuery(const Context &) const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
const auto & storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, makeASTFunction("MySQL",
std::make_shared<ASTLiteral>(mysql_host_name + ":" + toString(mysql_port)), std::make_shared<ASTLiteral>(mysql_database_name),
std::make_shared<ASTLiteral>(mysql_user_name), std::make_shared<ASTLiteral>(mysql_user_password)));
create_query->set(create_query->storage, storage);
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
......@@ -149,7 +198,7 @@ void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64>
++iterator;
else
{
outdated_tables.emplace_back(iterator->second.storage);
outdated_tables.emplace_back(iterator->second.second);
iterator = local_tables_cache.erase(iterator);
}
}
......@@ -163,7 +212,7 @@ void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, U
const auto & it = local_tables_cache.find(table_modification_time.first);
/// Outdated or new table structures
if (it == local_tables_cache.end() || table_modification_time.second > it->second.modification_time)
if (it == local_tables_cache.end() || table_modification_time.second > it->second.first)
wait_update_tables_name.emplace_back(table_modification_time.first);
}
......@@ -178,75 +227,14 @@ void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, U
const auto & iterator = local_tables_cache.find(table_name);
if (iterator != local_tables_cache.end())
{
outdated_tables.emplace_back(iterator->second.storage);
outdated_tables.emplace_back(iterator->second.second);
local_tables_cache.erase(iterator);
}
local_tables_cache[table_name] = createStorageInfo(table_name, columns_name_and_type, table_modification_time);
}
}
static ASTPtr getTableColumnsCreateQuery(const NamesAndTypesList & names_and_types_list)
{
const auto & table_columns_list_ast = std::make_shared<ASTColumns>();
const auto & columns_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & table_column_name_and_type : names_and_types_list)
{
const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = table_column_name_and_type.name;
column_declaration->type = dataTypeConvertToQuery(table_column_name_and_type.type);
columns_expression_list->children.emplace_back(column_declaration);
local_tables_cache[table_name] = std::make_pair(table_modification_time, StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), database_name_in_mysql, table_name,
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context));
}
table_columns_list_ast->set(table_columns_list_ast->columns, columns_expression_list);
return table_columns_list_ast;
}
static ASTPtr getTableStorageCreateQuery(
const String & host_name, const UInt16 & port,
const String & database_name, const String & table_name,
const String & user_name, const String & password)
{
const auto & table_storage = std::make_shared<ASTStorage>();
const auto & storage_engine = std::make_shared<ASTFunction>();
storage_engine->name = "MySQL";
storage_engine->arguments = std::make_shared<ASTExpressionList>();
storage_engine->children.push_back(storage_engine->arguments);
storage_engine->arguments->children = {
std::make_shared<ASTLiteral>(host_name + ":" + toString(port)),
std::make_shared<ASTLiteral>(database_name), std::make_shared<ASTLiteral>(table_name),
std::make_shared<ASTLiteral>(user_name), std::make_shared<ASTLiteral>(password)
};
table_storage->set(table_storage->engine, storage_engine);
return table_storage;
}
DatabaseMySQL::MySQLStorageInfo DatabaseMySQL::createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const
{
const auto & mysql_table = StorageMySQL::create(
database_name, table_name, std::move(mysql_pool), mysql_database_name, table_name,
false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context);
const auto & create_table_query = std::make_shared<ASTCreateQuery>();
create_table_query->table = table_name;
create_table_query->database = database_name;
create_table_query->set(create_table_query->columns_list, getTableColumnsCreateQuery(columns_name_and_type));
create_table_query->set(create_table_query->storage, getTableStorageCreateQuery(
mysql_host_name, mysql_port, mysql_database_name, table_name, mysql_user_name, mysql_user_password));
MySQLStorageInfo storage_info;
storage_info.storage = mysql_table;
storage_info.create_table_query = create_table_query;
storage_info.modification_time = table_modification_time;
return storage_info;
}
std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
......@@ -262,7 +250,7 @@ std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
" TABLE_NAME AS table_name, "
" CREATE_TIME AS modification_time "
" FROM INFORMATION_SCHEMA.TABLES "
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name;
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
std::map<String, UInt64> tables_with_modification_time;
MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE);
......@@ -306,7 +294,7 @@ std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << mysql_database_name
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
......@@ -331,19 +319,24 @@ std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const
void DatabaseMySQL::shutdown()
{
std::map<String, MySQLStorageInfo> tables_snapshot;
std::map<String, ModifyTimeAndStorage> tables_snapshot;
{
std::lock_guard lock(mutex);
tables_snapshot = local_tables_cache;
}
for (const auto & table_snapshot : tables_snapshot)
table_snapshot.second.storage->shutdown();
for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
modify_time_and_storage.second->shutdown();
std::lock_guard lock(mutex);
local_tables_cache.clear();
}
void DatabaseMySQL::drop()
{
Poco::File(getMetadataPath()).remove(true);
}
void DatabaseMySQL::cleanOutdatedTables()
{
setThreadName("MySQLDBCleaner");
......@@ -370,6 +363,98 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & storage)
{
std::lock_guard<std::mutex> lock{mutex};
if (!local_tables_cache.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
" because it does not exist.", ErrorCodes::UNKNOWN_TABLE);
if (!remove_or_detach_tables.count(table_name))
throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
" because it already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
/// We use the new storage to replace the original storage, because the original storage may have been dropped
/// Although we still keep its
local_tables_cache[table_name].second = storage;
remove_or_detach_tables.erase(table_name);
Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (remove_flag.exists())
remove_flag.remove();
}
StoragePtr DatabaseMySQL::detachTable(const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (!local_tables_cache.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
return local_tables_cache[table_name].second;
}
String DatabaseMySQL::getMetadataPath() const
{
return metadata_path;
}
void DatabaseMySQL::loadStoredObjects(Context &, bool)
{
std::lock_guard<std::mutex> lock{mutex};
Poco::DirectoryIterator iterator(getMetadataPath());
for (Poco::DirectoryIterator end; iterator != end; ++iterator)
{
if (iterator->isFile() && endsWith(iterator.name(), suffix))
{
const auto & filename = iterator.name();
const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix)));
remove_or_detach_tables.emplace(table_name);
}
}
}
void DatabaseMySQL::removeTable(const Context &, const String & table_name)
{
std::lock_guard<std::mutex> lock{mutex};
Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
if (remove_or_detach_tables.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
ErrorCodes::TABLE_IS_DROPPED);
if (remove_flag.exists())
throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(getDatabaseName()) +
"." + backQuoteIfNeed(table_name) + " does not exists remove tables, it is bug.", ErrorCodes::LOGICAL_ERROR);
if (!local_tables_cache.count(table_name))
throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
remove_or_detach_tables.emplace(table_name);
try
{
remove_flag.createFile();
}
catch (...)
{
remove_or_detach_tables.erase(table_name);
throw;
}
}
DatabaseMySQL::~DatabaseMySQL()
{
try
......@@ -392,6 +477,27 @@ DatabaseMySQL::~DatabaseMySQL()
}
}
void DatabaseMySQL::createTable(const Context & context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
{
const auto & create = create_query->as<ASTCreateQuery>();
if (!create->attach)
throw Exception("MySQL database engine does not support create table. for tables that were detach or dropped before, "
"you can use attach to add them back to the MySQL database", ErrorCodes::NOT_IMPLEMENTED);
/// XXX: hack
/// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
/// we should compare the old and new create_query to make them completely consistent
const auto & origin_create_query = getCreateTableQuery(context, table_name);
origin_create_query->as<ASTCreateQuery>()->attach = true;
if (queryToString(origin_create_query) != queryToString(create_query))
throw Exception("The MySQL database engine can only execute attach statements of type attach table database_name.table_name",
ErrorCodes::UNEXPECTED_AST_STRUCTURE);
attachTable(table_name, storage);
}
}
#endif
......@@ -7,6 +7,7 @@
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
#include <memory>
#include <Parsers/ASTCreateQuery.h>
namespace DB
......@@ -21,8 +22,9 @@ class DatabaseMySQL : public IDatabase
public:
~DatabaseMySQL() override;
DatabaseMySQL(const Context & context_, const String & database_name_, const String & mysql_host_name_, const UInt16 & mysql_port_,
const String & mysql_database_name_, const String & mysql_user_name_, const String & mysql_user_password_);
DatabaseMySQL(
const Context & global_context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, mysqlxx::Pool && pool);
String getEngineName() const override { return "MySQL"; }
......@@ -54,29 +56,27 @@ public:
ASTPtr tryGetCreateDictionaryQuery(const Context &, const String &) const override { return nullptr; }
time_t getObjectMetadataModificationTime(const Context & context, const String & name) override;
void shutdown() override;
StoragePtr detachTable(const String &) override
{
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void drop() override;
void detachDictionary(const String &, const Context &, bool) override
{
throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
String getMetadataPath() const override;
void loadStoredObjects(Context &, bool) override
{
/// do nothing
}
void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void loadStoredObjects(Context &, bool) override;
StoragePtr detachTable(const String & table_name) override;
void removeTable(const Context &, const String & table_name) override;
void attachTable(const String & table_name, const StoragePtr & storage) override;
void removeTable(const Context &, const String &) override
void detachDictionary(const String &, const Context &, bool) override
{
throw Exception("MySQL database engine does not support remove table.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void removeDictionary(const Context &, const String &) override
......@@ -84,52 +84,35 @@ public:
throw Exception("MySQL database engine does not support remove dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachTable(const String &, const StoragePtr &) override
{
throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachDictionary(const String &, const Context &, bool) override
{
throw Exception("MySQL database engine does not support attach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
void createTable(const Context &, const String &, const StoragePtr &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
}
void createDictionary(const Context &, const String &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
private:
struct MySQLStorageInfo
{
StoragePtr storage;
UInt64 modification_time;
ASTPtr create_table_query;
};
const Context global_context;
const String database_name;
const String mysql_host_name;
const UInt16 mysql_port;
const String mysql_database_name;
const String mysql_user_name;
const String mysql_user_password;
Context global_context;
String database_name;
String metadata_path;
ASTPtr database_engine_define;
String database_name_in_mysql;
mutable std::mutex mutex;
std::atomic<bool> quit{false};
std::condition_variable cond;
mutable mysqlxx::Pool mysql_pool;
using MySQLPool = mysqlxx::Pool;
using ModifyTimeAndStorage = std::pair<UInt64, StoragePtr>;
mutable MySQLPool mysql_pool;
mutable std::vector<StoragePtr> outdated_tables;
mutable std::map<String, MySQLStorageInfo> local_tables_cache;
mutable std::map<String, ModifyTimeAndStorage> local_tables_cache;
std::unordered_set<String> remove_or_detach_tables;
void cleanOutdatedTables();
......@@ -137,9 +120,6 @@ private:
std::map<String, UInt64> fetchTablesWithModificationTime() const;
DatabaseMySQL::MySQLStorageInfo createStorageInfo(
const String & table_name, const NamesAndTypesList & columns_name_and_type, const UInt64 & table_modification_time) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
......
......@@ -127,8 +127,6 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
/// Create directories for tables metadata.
String path = context.getPath();
String metadata_path = path + "metadata/" + database_name_escaped + "/";
Poco::File(metadata_path).createDirectory();
DatabasePtr database = DatabaseFactory::get(database_name, metadata_path, create.storage, context);
/// Will write file with database metadata, if needed.
......
from contextlib import contextmanager
import time
import pytest
import contextlib
## sudo -H pip install PyMySQL
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_normal_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
`age` int NOT NULL default 0,
`money` int NOT NULL default 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
create_table_mysql_style_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`float` float NOT NULL,
`Float32` float NOT NULL,
`test``name` varchar(50) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
"""
drop_table_sql_template = "DROP TABLE `clickhouse`.`{}`"
add_column_sql_template = "ALTER TABLE `clickhouse`.`{}` ADD COLUMN `pid` int(11)"
del_column_sql_template = "ALTER TABLE `clickhouse`.`{}` DROP COLUMN `pid`"
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
conn = get_mysql_conn()
## create mysql db and table
create_mysql_db(conn, 'clickhouse')
node1.query("CREATE DATABASE clickhouse_mysql ENGINE = MySQL('mysql1:3306', 'clickhouse', 'root', 'clickhouse')")
yield cluster
finally:
cluster.shutdown()
def test_sync_tables_list_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
assert node1.query('SHOW TABLES FROM clickhouse_mysql FORMAT TSV').rstrip() == ''
create_normal_mysql_table(mysql_connection, 'first_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'first_mysql_table' FORMAT TSV").rstrip() == 'first_mysql_table'
create_normal_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == 'second_mysql_table'
drop_mysql_table(mysql_connection, 'second_mysql_table')
assert node1.query("SHOW TABLES FROM clickhouse_mysql LIKE 'second_mysql_table' FORMAT TSV").rstrip() == ''
mysql_connection.close()
class MySQLNodeInstance:
def __init__(self, user='root', password='clickhouse', hostname='127.0.0.1', port=3308):
self.user = user
self.port = port
self.hostname = hostname
self.password = password
self.mysql_connection = None # lazy init
def test_sync_tables_structure_between_clickhouse_and_mysql(started_cluster):
mysql_connection = get_mysql_conn()
def query(self, execution_query):
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname, port=self.port)
with self.mysql_connection.cursor() as cursor:
cursor.execute(execution_query)
create_normal_mysql_table(mysql_connection, 'test_sync_column')
def close(self):
if self.mysql_connection is not None:
self.mysql_connection.close()
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
time.sleep(3)
add_mysql_table_column(mysql_connection, "test_sync_column")
def test_mysql_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == 'pid'
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
assert 'test_database' in clickhouse_node.query('SHOW DATABASES')
time.sleep(3)
drop_mysql_table_column(mysql_connection, "test_sync_column")
assert node1.query(
"SELECT name FROM system.columns WHERE table = 'test_sync_column' AND database = 'clickhouse_mysql' AND name = 'pid' ").rstrip() == ''
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
mysql_connection.close()
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` ADD COLUMN `add_column` int(11)')
assert 'add_column' in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
def test_insert_select(started_cluster):
mysql_connection = get_mysql_conn()
create_normal_mysql_table(mysql_connection, 'test_insert_select')
time.sleep(3) # Because the unit of MySQL modification time is seconds, modifications made in the same second cannot be obtained
mysql_node.query('ALTER TABLE `test_database`.`test_table` DROP COLUMN `add_column`')
assert 'add_column' not in clickhouse_node.query("SELECT name FROM system.columns WHERE table = 'test_table' AND database = 'test_database'")
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '0'
node1.query("INSERT INTO `clickhouse_mysql`.{}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000) ".format('test_insert_select'))
assert node1.query("SELECT count() FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '10000'
assert node1.query("SELECT sum(money) FROM `clickhouse_mysql`.{}".format('test_insert_select')).rstrip() == '30000'
mysql_connection.close()
mysql_node.query('DROP TABLE `test_database`.`test_table`;')
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
def test_insert_select_with_mysql_style_table(started_cluster):
mysql_connection = get_mysql_conn()
create_mysql_style_mysql_table(mysql_connection, 'test_mysql``_style_table')
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
assert node1.query("SELECT count() FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '0'
node1.query("INSERT INTO `clickhouse_mysql`.`{}`(id, `float`, `Float32`, `test\`name`) select number, 3, 3, 'name' from numbers(10000) ".format('test_mysql\`_style_table'))
assert node1.query("SELECT count() FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '10000'
assert node1.query("SELECT sum(`float`) FROM `clickhouse_mysql`.`{}`".format('test_mysql\`_style_table')).rstrip() == '30000'
mysql_connection.close()
mysql_node.query("DROP DATABASE test_database")
def test_table_function(started_cluster):
mysql_connection = get_mysql_conn()
create_normal_mysql_table(mysql_connection, 'table_function')
table_function = get_mysql_table_function_expr('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
node1.query("INSERT INTO {} (id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000)".format('TABLE FUNCTION ' + table_function))
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(c) FROM ("
"SELECT count() as c FROM {} WHERE id % 3 == 0"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 1"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 2)".format(table_function, table_function, table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '30000'
mysql_connection.close()
def test_clickhouse_ddl_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;')
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
def get_mysql_table_function_expr(table_name):
return "mysql('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format(table_name)
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DROP TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in clickhouse_node.query('SHOW TABLES FROM test_database')
clickhouse_node.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in clickhouse_node.query('SHOW TABLES FROM test_database')
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
clickhouse_node.query("DROP DATABASE test_database")
assert 'test_database' not in clickhouse_node.query('SHOW DATABASES')
def create_normal_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_normal_sql_template.format(table_name))
mysql_node.query("DROP DATABASE test_database")
def create_mysql_style_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_mysql_style_sql_template.format(table_name))
def drop_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(table_name))
def test_clickhouse_dml_for_mysql_database(started_cluster):
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query('CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;')
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', 'test_database', 'root', 'clickhouse')")
def add_mysql_table_column(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(add_column_sql_template.format(table_name))
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '0'
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i\`d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
def drop_mysql_table_column(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(del_column_sql_template.format(table_name))
mysql_node.query("DROP DATABASE test_database")
......@@ -9,8 +9,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql = True)
create_table_sql_template = """
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` int(11) NOT NULL,
`name` varchar(50) NOT NULL,
......@@ -19,6 +19,7 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
......@@ -76,6 +77,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
assert node1.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '60000'
conn.close()
def test_where(started_cluster):
table_name = 'test_where'
conn = get_mysql_conn()
......@@ -92,15 +94,35 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
assert node1.query("SELECT count() FROM {} WHERE name LIKE concat('name_', toString(1))".format(table_name)).rstrip() == '1'
conn.close()
def test_table_function(started_cluster):
conn = get_mysql_conn()
create_mysql_table(conn, 'table_function')
table_function = "mysql('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
node1.query("INSERT INTO {} (id, name, money) select number, concat('name_', toString(number)), 3 from numbers(10000)".format(
'TABLE FUNCTION ' + table_function))
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(c) FROM ("
"SELECT count() as c FROM {} WHERE id % 3 == 0"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 1"
" UNION ALL SELECT count() as c FROM {} WHERE id % 3 == 2)".format(table_function, table_function,
table_function)).rstrip() == '10000'
assert node1.query("SELECT sum(`money`) FROM {}".format(table_function)).rstrip() == '30000'
conn.close()
def get_mysql_conn():
conn = pymysql.connect(user='root', password='clickhouse', host='127.0.0.1', port=3308)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
......
......@@ -6,8 +6,6 @@ The `MySQL` database engine translate queries to the MySQL server so you can per
You cannot perform the following queries:
- `ATTACH`/`DETACH`
- `DROP`
- `RENAME`
- `CREATE TABLE`
- `ALTER`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册