From 209015574f6a752eb08aa351b14963c96828fe49 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Thu, 13 Apr 2017 19:12:56 +0300 Subject: [PATCH] Workable version for CREATE DROP w/o ZK tests. [#CLICKHOUSE-5] --- dbms/src/Common/Exception.cpp | 53 +-- dbms/src/Common/Exception.h | 1 + dbms/src/Core/ColumnWithTypeAndName.h | 4 + dbms/src/DataStreams/BlockIO.h | 26 +- dbms/src/Interpreters/Cluster.cpp | 6 + dbms/src/Interpreters/Cluster.h | 3 + dbms/src/Interpreters/DDLWorker.cpp | 326 ++++++++++++++---- dbms/src/Interpreters/DDLWorker.h | 48 ++- .../Interpreters/InterpreterCreateQuery.cpp | 76 +--- .../src/Interpreters/InterpreterCreateQuery.h | 5 +- .../src/Interpreters/InterpreterDropQuery.cpp | 20 +- dbms/src/Interpreters/InterpreterDropQuery.h | 5 +- dbms/src/Parsers/ASTCreateQuery.h | 26 +- dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp | 12 + dbms/src/Parsers/ASTDDLQueryWithOnCluster.h | 22 ++ dbms/src/Parsers/ASTDropQuery.h | 19 +- dbms/src/Parsers/ParserCreateQuery.cpp | 6 +- dbms/src/Parsers/ParserDropQuery.cpp | 18 + dbms/src/Parsers/formatAST.cpp | 6 - dbms/src/Parsers/formatAST.h | 2 - dbms/src/Server/Server.cpp | 6 +- .../Storages/MergeTree/ReshardingWorker.cpp | 8 +- 22 files changed, 490 insertions(+), 208 deletions(-) create mode 100644 dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp create mode 100644 dbms/src/Parsers/ASTDDLQueryWithOnCluster.h diff --git a/dbms/src/Common/Exception.cpp b/dbms/src/Common/Exception.cpp index 2a94cb0b3b..8aacf5e32e 100644 --- a/dbms/src/Common/Exception.cpp +++ b/dbms/src/Common/Exception.cpp @@ -83,28 +83,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded } catch (const Exception & e) { - try - { - std::string text = e.displayText(); - - bool has_embedded_stack_trace = false; - if (check_embedded_stacktrace) - { - auto embedded_stack_trace_pos = text.find("Stack trace"); - has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos; - if (!with_stacktrace && has_embedded_stack_trace) - { - text.resize(embedded_stack_trace_pos); - Poco::trimRightInPlace(text); - } - } - - stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what(); - - if (with_stacktrace && !has_embedded_stack_trace) - stream << ", Stack trace:\n\n" << e.getStackTrace().toString(); - } - catch (...) {} + stream << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace); } catch (const Poco::Exception & e) { @@ -230,6 +209,36 @@ void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::str } } +std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace) +{ + std::stringstream stream; + + try + { + std::string text = e.displayText(); + + bool has_embedded_stack_trace = false; + if (check_embedded_stacktrace) + { + auto embedded_stack_trace_pos = text.find("Stack trace"); + has_embedded_stack_trace = embedded_stack_trace_pos != std::string::npos; + if (!with_stacktrace && has_embedded_stack_trace) + { + text.resize(embedded_stack_trace_pos); + Poco::trimRightInPlace(text); + } + } + + stream << "Code: " << e.code() << ", e.displayText() = " << text << ", e.what() = " << e.what(); + + if (with_stacktrace && !has_embedded_stack_trace) + stream << ", Stack trace:\n\n" << e.getStackTrace().toString(); + } + catch (...) {} + + return stream.str(); +} + std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace) { try diff --git a/dbms/src/Common/Exception.h b/dbms/src/Common/Exception.h index 05a4047930..8e67745338 100644 --- a/dbms/src/Common/Exception.h +++ b/dbms/src/Common/Exception.h @@ -92,6 +92,7 @@ int getCurrentExceptionCode(); void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = ""); void tryLogException(std::exception_ptr e, Poco::Logger * logger, const std::string & start_of_message = ""); +std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false); std::string getExceptionMessage(std::exception_ptr e, bool with_stacktrace); diff --git a/dbms/src/Core/ColumnWithTypeAndName.h b/dbms/src/Core/ColumnWithTypeAndName.h index 88c6fffadb..09da42ccf3 100644 --- a/dbms/src/Core/ColumnWithTypeAndName.h +++ b/dbms/src/Core/ColumnWithTypeAndName.h @@ -22,6 +22,10 @@ struct ColumnWithTypeAndName ColumnWithTypeAndName(const ColumnPtr & column_, const DataTypePtr & type_, const String name_) : column(column_), type(type_), name(name_) {} + /// Uses type->createColumn() to create column + ColumnWithTypeAndName(const DataTypePtr & type_, const String name_) + : column(type_->createColumn()), type(type_), name(name_) {} + ColumnWithTypeAndName cloneEmpty() const; bool operator==(const ColumnWithTypeAndName & other) const; String prettyPrint() const; diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 0d704993ea..5ac609f8a6 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -25,7 +25,7 @@ struct BlockIO Block out_sample; /// Example of a block to be written to `out`. /// Callbacks for query logging could be set here. - std::function finish_callback; + std::function finish_callback; std::function exception_callback; /// Call these functions if you want to log the request. @@ -44,18 +44,18 @@ struct BlockIO BlockIO & operator= (const BlockIO & rhs) { /// We provide the correct order of destruction. - out = nullptr; - in = nullptr; - process_list_entry = nullptr; - - process_list_entry = rhs.process_list_entry; - in = rhs.in; - out = rhs.out; - in_sample = rhs.in_sample; - out_sample = rhs.out_sample; - - finish_callback = rhs.finish_callback; - exception_callback = rhs.exception_callback; + out = nullptr; + in = nullptr; + process_list_entry = nullptr; + + process_list_entry = rhs.process_list_entry; + in = rhs.in; + out = rhs.out; + in_sample = rhs.in_sample; + out_sample = rhs.out_sample; + + finish_callback = rhs.finish_callback; + exception_callback = rhs.exception_callback; return *this; } diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index cfc4895251..c68bafc743 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -108,6 +109,11 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const } } +String Cluster::Address::toString() const +{ + return host_name + ':' + DB::toString(port); +} + /// Implementation of Clusters class Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name) diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index b29dac9a4b..2b60c1b4d7 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -58,6 +58,9 @@ public: Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix); Address(const String & host_port_, const String & user_, const String & password_); + + /// Returns 'host_name:port' + String toString() const; }; using Addresses = std::vector
; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 323e79374b..96e08cb9eb 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -1,8 +1,22 @@ -#include - #include -#include + +#include +#include +#include +#include + #include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include #include @@ -19,58 +33,47 @@ namespace { /// Helper class which extracts from the ClickHouse configuration file /// the parameters we need for operating the resharding thread. -class Arguments final -{ -public: - Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) - { - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_name, keys); - - for (const auto & key : keys) - { - if (key == "task_queue_path") - task_queue_path = config.getString(config_name + "." + key); - else - throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG}; - } - - if (task_queue_path.empty()) - throw Exception{"Resharding: missing parameter task_queue_path", ErrorCodes::INVALID_CONFIG_PARAMETER}; - } - - Arguments(const Arguments &) = delete; - Arguments & operator=(const Arguments &) = delete; - - std::string getTaskQueuePath() const - { - return task_queue_path; - } - -private: - std::string task_queue_path; -}; +// struct Arguments +// { +// public: +// Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) +// { +// Poco::Util::AbstractConfiguration::Keys keys; +// config.keys(config_name, keys); +// +// for (const auto & key : keys) +// { +// if (key == "distributed_ddl_root") +// ddl_queries_root = config.getString(config_name + "." + key); +// else +// throw Exception{"Unknown parameter in distributed DDL configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG}; +// } +// +// if (ddl_queries_root.empty()) +// throw Exception{"Distributed DDL: missing parameter distributed_ddl_root", ErrorCodes::INVALID_CONFIG_PARAMETER}; +// } +// +// std::string ddl_queries_root; +// }; } -DDLWorker::DDLWorker(const Poco::Util::AbstractConfiguration & config, - const std::string & config_name, Context & context_) - : context(context_) - , stop_flag(false) -{ - Arguments arguments(config, config_name); - auto zookeeper = context.getZooKeeper(); - std::string root = arguments.getTaskQueuePath(); - if (root.back() != '/') - root += "/"; +DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_) + : context(context_), stop_flag(false) +{ + root_dir = zk_root_dir; + if (root_dir.back() == '/') + root_dir.resize(root_dir.size() - 1); - auto current_host = getFQDNOrHostName(); - host_task_queue_path = "/clickhouse/task_queue/ddl/" + current_host; + hostname = getFQDNOrHostName() + ':' + DB::toString(context.getTCPPort()); + assign_dir = getAssignsDir() + hostname; + master_dir = getMastersDir() + hostname; thread = std::thread(&DDLWorker::run, this); } + DDLWorker::~DDLWorker() { stop_flag = true; @@ -78,43 +81,141 @@ DDLWorker::~DDLWorker() thread.join(); } + void DDLWorker::processTasks() { - processCreate(host_task_queue_path + "/create"); + auto zookeeper = context.getZooKeeper(); + + if (!zookeeper->exists(assign_dir)) + return; + + Strings tasks = zookeeper->getChildren(assign_dir); + if (tasks.empty()) + return; + + String current_task = *std::min_element(tasks.cbegin(), tasks.cend()); + + try + { + processTask(current_task); + } + catch (...) + { + tryLogCurrentException(log, "An unexpected error occurred during task " + current_task); + throw; + } } -void DDLWorker::processCreate(const std::string & path) + +bool DDLWorker::processTask(const std::string & task) { auto zookeeper = context.getZooKeeper(); - if (!zookeeper->exists(path)) - return; + String query_dir = root_dir + "/" + task; + String assign_node = assign_dir + "/" + task; + String active_node = query_dir + "/active/" + hostname; + String sucsess_node = query_dir + "/sucess/" + hostname; + String fail_node = query_dir + "/failed/" + hostname; - const Strings & children = zookeeper->getChildren(path); + String query = zookeeper->get(query_dir); - for (const auto & name : children) + if (zookeeper->exists(sucsess_node) || zookeeper->exists(fail_node)) { - const std::string & query_path = path + "/" + name; + throw Exception( + "Task " + task + " (query " + query + ") was already processed by node " + hostname, ErrorCodes::LOGICAL_ERROR); + } - try - { - std::string value; + /// Create active flag + zookeeper->create(active_node, "", zkutil::CreateMode::Ephemeral); - if (zookeeper->tryGet(query_path, value)) - { - if (!value.empty()) - executeQuery(value, context); + /// Will delete task from host's tasks list, delete active flag and ... + zkutil::Ops ops; + ops.emplace_back(std::make_unique(assign_node, -1)); + ops.emplace_back(std::make_unique(active_node, -1)); - zookeeper->remove(query_path); - } + try + { + executeQuery(query, context); + } + catch (...) + { + /// ... and create fail flag + String exception_msg = getCurrentExceptionMessage(false, true); + ops.emplace_back(std::make_unique(fail_node, exception_msg, nullptr, zkutil::CreateMode::Persistent)); + zookeeper->multi(ops); + + tryLogCurrentException(log, "Query " + query + " wasn't finished successfully"); + return false; + } + + /// .. and create sucess flag + ops.emplace_back(std::make_unique(sucsess_node, "", nullptr, zkutil::CreateMode::Persistent)); + zookeeper->multi(ops); + + return true; +} + + +void DDLWorker::enqueueQuery(const String & query, const std::vector & addrs) +{ + auto zookeeper = context.getZooKeeper(); + + String assigns_dir = getAssignsDir(); + String master_dir = getCurrentMasterDir(); + String query_path_prefix = getRoot() + "/query-"; + + zookeeper->createAncestors(assigns_dir + "/"); + zookeeper->createAncestors(master_dir + "/"); + String query_path = zookeeper->create(query_path_prefix, query, zkutil::CreateMode::PersistentSequential); + String query_node = query_path.substr(query_path.find_last_of('/') + 1); + + zkutil::Ops ops; + auto acl = zookeeper->getDefaultACL(); + constexpr size_t max_ops_per_call = 100; + + /// Create /root/masters/query_node and /root/query-node/* to monitor status of the tasks initiated by us + { + String num_hosts = toString(addrs.size()); + String master_query_node = master_dir + "/" + query_node; + ops.emplace_back(std::make_unique(master_query_node, "", acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(query_path + "/active", "", acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(query_path + "/sucess", num_hosts, acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(query_path + "/failed", "", acl, zkutil::CreateMode::Persistent)); + zookeeper->multi(ops); + ops.clear(); + } + + /// Create hosts's taks dir /root/assigns/host (if not exists) + for (auto it = addrs.cbegin(); it != addrs.cend(); ++it) + { + String cur_assign_dir = assigns_dir + "/" + it->toString(); + ops.emplace_back(std::make_unique(cur_assign_dir, "", acl, zkutil::CreateMode::Persistent)); + + if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend()) + { + int code = zookeeper->tryMulti(ops); + ops.clear(); + + if (code != ZOK && code != ZNODEEXISTS) + throw zkutil::KeeperException(code); } - catch (const std::exception & ex) + } + + /// Asssign tasks to hosts /root/assigns/host/query_node + for (auto it = addrs.cbegin(); it != addrs.cend(); ++it) + { + String cur_task_path = assigns_dir + "/" + it->toString() + "/" + query_node; + ops.emplace_back(std::make_unique(cur_task_path, "", acl, zkutil::CreateMode::Persistent)); + + if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend()) { - LOG_ERROR(log, ex.what() + std::string(" on ") + query_path); + zookeeper->multi(ops); + ops.clear(); } } } + void DDLWorker::run() { using namespace std::chrono_literals; @@ -135,4 +236,101 @@ void DDLWorker::run() } } + +static bool getRemoteQueryExecutionStatus(const Cluster::Address & addr, const std::string & query, Exception & out_exception) +{ + Connection conn(addr.host_name, addr.port, "", addr.user, addr.password); + conn.sendQuery(query); + + while (true) + { + Connection::Packet packet = conn.receivePacket(); + + if (packet.type == Protocol::Server::Exception) + { + out_exception = *packet.exception; + return false; + } + else if (packet.type == Protocol::Server::EndOfStream) + break; + } + + return true; +} + + +BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context) +{ + ClusterPtr cluster = context.getCluster(cluster_name); + Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses(); + std::vector pending_hosts; + + Array hosts_names_failed, hosts_errors, hosts_names_pending; + size_t num_hosts_total = 0; + size_t num_hosts_finished_successfully = 0; + + for (const auto & shard : shards) + { + for (const auto & addr : shard) + { + try + { + Exception ex; + if (!getRemoteQueryExecutionStatus(addr, query, ex)) + { + /// Normal error + String exception_msg = getExceptionMessage(ex, false, true); + hosts_names_failed.emplace_back(addr.host_name); + hosts_errors.emplace_back(exception_msg); + + LOG_INFO(&Logger::get("DDLWorker"), "Query " << query << " failed on " << addr.host_name << ": " << exception_msg); + } + else + { + ++num_hosts_finished_successfully; + } + } + catch (...) + { + /// Network error + pending_hosts.emplace_back(addr); + hosts_names_pending.emplace_back(addr.host_name); + } + } + num_hosts_total += shard.size(); + } + + + if (!pending_hosts.empty()) + context.getDDLWorker().enqueueQuery(query, pending_hosts); + + + auto aray_of_strings = std::make_shared(std::make_shared()); + Block block{ + {std::make_shared(), "num_hosts_total"}, + {std::make_shared(), "num_hosts_finished_successfully"}, + {std::make_shared(), "num_hosts_finished_unsuccessfully"}, + {std::make_shared(), "num_hosts_pending"}, + {aray_of_strings->clone(), "hosts_finished_unsuccessfully"}, + {aray_of_strings->clone(), "hosts_finished_unsuccessfully_errors"}, + {aray_of_strings->clone(), "hosts_pending"} + }; + + size_t num_hosts_finished = num_hosts_total - pending_hosts.size(); + size_t num_hosts_finished_unsuccessfully = num_hosts_finished - num_hosts_finished_successfully; + block.getByName("num_hosts_total").column->insert(num_hosts_total); + block.getByName("num_hosts_finished_successfully").column->insert(num_hosts_finished_successfully); + block.getByName("num_hosts_finished_unsuccessfully").column->insert(num_hosts_finished_unsuccessfully); + block.getByName("num_hosts_pending").column->insert(pending_hosts.size()); + block.getByName("hosts_finished_unsuccessfully").column->insert(hosts_names_failed); + block.getByName("hosts_finished_unsuccessfully_errors").column->insert(hosts_errors); + block.getByName("hosts_pending").column->insert(hosts_names_pending); + + BlockIO io; + io.in_sample = block.cloneEmpty(); + io.in = std::make_shared(block); + return io; +} + + } diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index f35303873f..1414194321 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -1,6 +1,7 @@ #pragma once - #include +#include +#include #include #include @@ -12,16 +13,50 @@ namespace DB { + +BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context); + + class DDLWorker { public: - DDLWorker(const Poco::Util::AbstractConfiguration & config, - const std::string & config_name, Context & context_); + DDLWorker(const std::string & zk_root_dir, Context & context_); ~DDLWorker(); + void enqueueQuery(const String & query, const std::vector & addrs); + + /// Returns root/ path in ZooKeeper + std::string getRoot() const + { + return root_dir; + } + + std::string getAssignsDir() const + { + return root_dir + "/assigns"; + } + + std::string getMastersDir() const + { + return root_dir + "/masters"; + } + + std::string getCurrentMasterDir() const + { + return getMastersDir() + "/" + getHostName(); + } + + std::string getHostName() const + { + return hostname; + } + private: void processTasks(); - void processCreate(const std::string & path); + bool processTask(const std::string & task); + + void processQueries(); + bool processQuery(const std::string & task); void run(); @@ -29,7 +64,10 @@ private: Context & context; Logger * log = &Logger::get("DDLWorker"); - std::string host_task_queue_path; + std::string hostname; + std::string root_dir; /// common dir with queue of queries + std::string assign_dir; /// dir with tasks assigned to the server + std::string master_dir; /// dir with queries was initiated by the server std::atomic stop_flag; std::condition_variable cond_var; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 4d0216a7d0..bc45517848 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -31,6 +32,7 @@ #include #include #include +#include #include #include @@ -57,21 +59,6 @@ namespace ErrorCodes extern const int DUPLICATE_COLUMN; } -static void ExecuteQuery(const Cluster::Address & addr, const std::string & query) -{ - Connection conn(addr.host_name, addr.port, "", addr.user, addr.password); - conn.sendQuery(query); - - while (true) - { - Connection::Packet packet = conn.receivePacket(); - - if (packet.type == Protocol::Server::Exception) - throw Exception(*packet.exception.get()); - else if (packet.type == Protocol::Server::EndOfStream) - break; - } -} InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) @@ -474,7 +461,7 @@ String InterpreterCreateQuery::setEngine( } -BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) +BlockIO InterpreterCreateQuery::createTableOnServer(ASTCreateQuery & create) { String path = context.getPath(); String current_database = context.getCurrentDatabase(); @@ -578,63 +565,16 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) return {}; } -ASTPtr InterpreterCreateQuery::createQueryWithoutCluster(ASTCreateQuery & create) const -{ - ASTPtr cloned = create.clone(); - ASTCreateQuery & tmp = typeid_cast(*cloned); - tmp.cluster.clear(); - if (tmp.database.empty()) - tmp.database = context.getCurrentDatabase(); - return cloned; -} - -void InterpreterCreateQuery::writeToZookeeper(ASTPtr query, const std::vector & addrs) -{ - auto zookeeper = context.getZooKeeper(); - ASTCreateQuery & create = typeid_cast(*query); - std::string table_name = create.database + "." + create.table; - - for (const auto & addr : addrs) - { - const std::string & path = - "/clickhouse/task_queue/ddl/" + - addr.host_name + - "/create/" + table_name; - - // TODO catch exceptions - zookeeper->createAncestors(path); - zookeeper->create(path, formatASTToString(*query), 0); - } -} BlockIO InterpreterCreateQuery::createTableOnCluster(ASTCreateQuery & create) { - ClusterPtr cluster = context.getCluster(create.cluster); - Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses(); - ASTPtr query_ptr = createQueryWithoutCluster(create); - std::string query = formatASTToString(*query_ptr); - std::vector failed; + /// Do we really should use that database for each server? + String query = create.getRewrittenQueryWithoutOnCluster(context.getCurrentDatabase()); - for (const auto & shard : shards) - { - for (const auto & addr : shard) - { - try - { - ExecuteQuery(addr, query); - } - catch (...) - { - failed.push_back(addr); - } - } - } - - writeToZookeeper(query_ptr, failed); - - return {}; + return executeDDLQueryOnCluster(query, create.cluster, context); } + BlockIO InterpreterCreateQuery::execute() { ASTCreateQuery & create = typeid_cast(*query_ptr); @@ -648,7 +588,7 @@ BlockIO InterpreterCreateQuery::execute() else if (!create.cluster.empty()) return createTableOnCluster(create); else - return createTable(create); + return createTableOnServer(create); } diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index e667540ad5..58d694768a 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -56,12 +56,9 @@ public: private: void createDatabase(ASTCreateQuery & create); - BlockIO createTable(ASTCreateQuery & create); + BlockIO createTableOnServer(ASTCreateQuery & create); BlockIO createTableOnCluster(ASTCreateQuery & create); - ASTPtr createQueryWithoutCluster(ASTCreateQuery & create) const; - void writeToZookeeper(ASTPtr query, const std::vector& addrs); - /// Calculate list of columns of table and return it. ColumnsInfo setColumns(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const; String setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const; diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index fb4eaf1c58..abb5886ba4 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -26,12 +27,20 @@ InterpreterDropQuery::InterpreterDropQuery(const ASTPtr & query_ptr_, Context & BlockIO InterpreterDropQuery::execute() +{ + ASTDropQuery & drop = typeid_cast(*query_ptr); + + if (drop.cluster.empty()) + return executeOnServer(drop); + else + return executeOnCluster(drop); +} + +BlockIO InterpreterDropQuery::executeOnServer(ASTDropQuery & drop) { String path = context.getPath(); String current_database = context.getCurrentDatabase(); - ASTDropQuery & drop = typeid_cast(*query_ptr); - bool drop_database = drop.table.empty() && !drop.database.empty(); if (drop_database && drop.detach) @@ -144,4 +153,11 @@ BlockIO InterpreterDropQuery::execute() } +BlockIO InterpreterDropQuery::executeOnCluster(ASTDropQuery & drop) +{ + String query = drop.getRewrittenQueryWithoutOnCluster(context.getCurrentDatabase()); + return executeDDLQueryOnCluster(query, drop.cluster, context); +} + + } diff --git a/dbms/src/Interpreters/InterpreterDropQuery.h b/dbms/src/Interpreters/InterpreterDropQuery.h index da61ab7ba1..27dd00ee13 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.h +++ b/dbms/src/Interpreters/InterpreterDropQuery.h @@ -18,12 +18,15 @@ class InterpreterDropQuery : public IInterpreter public: InterpreterDropQuery(const ASTPtr & query_ptr_, Context & context_); - /// Drop table. + /// Drop table or database. BlockIO execute() override; private: ASTPtr query_ptr; Context & context; + + BlockIO executeOnServer(ASTDropQuery & drop); + BlockIO executeOnCluster(ASTDropQuery & drop); }; diff --git a/dbms/src/Parsers/ASTCreateQuery.h b/dbms/src/Parsers/ASTCreateQuery.h index b19f442a71..ec4b74083e 100644 --- a/dbms/src/Parsers/ASTCreateQuery.h +++ b/dbms/src/Parsers/ASTCreateQuery.h @@ -2,15 +2,15 @@ #include #include +#include namespace DB { -/** CREATE TABLE or ATTACH TABLE query - */ -class ASTCreateQuery : public IAST +/// CREATE TABLE or ATTACH TABLE query +class ASTCreateQuery : public IAST, public ASTDDLQueryWithOnCluster { public: bool attach{false}; /// Query ATTACH TABLE, not CREATE TABLE. @@ -21,7 +21,6 @@ public: bool is_temporary{false}; String database; String table; - String cluster; ASTPtr columns; ASTPtr storage; ASTPtr inner_storage; /// Internal engine for the CREATE MATERIALIZED VIEW query @@ -48,6 +47,18 @@ public: return res; } + ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override + { + auto query_ptr = clone(); + ASTCreateQuery & query = static_cast(*query_ptr); + + query.cluster.clear(); + if (query.database.empty()) + query.database = new_database; + + return query_ptr; + } + protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override { @@ -81,10 +92,11 @@ protected: << (settings.hilite ? hilite_keyword : "") << (attach ? "ATTACH " : "CREATE ") << (is_temporary ? "TEMPORARY " : "") - << what - << " " << (if_not_exists ? "IF NOT EXISTS " : "") + << what << " " + << (if_not_exists ? "IF NOT EXISTS " : "") << (settings.hilite ? hilite_none : "") - << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); + << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " " + << (!cluster.empty() ? "ON CLUSTER " + backQuoteIfNeed(cluster) + " " : ""); } if (!as_table.empty()) diff --git a/dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp b/dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp new file mode 100644 index 0000000000..9ddd7a47dc --- /dev/null +++ b/dbms/src/Parsers/ASTDDLQueryWithOnCluster.cpp @@ -0,0 +1,12 @@ +#include +#include + +namespace DB +{ + +std::string ASTDDLQueryWithOnCluster::getRewrittenQueryWithoutOnCluster(const std::string & new_database) const +{ + return queryToString(getRewrittenASTWithoutOnCluster(new_database)); +} + +} diff --git a/dbms/src/Parsers/ASTDDLQueryWithOnCluster.h b/dbms/src/Parsers/ASTDDLQueryWithOnCluster.h new file mode 100644 index 0000000000..c75ba51a87 --- /dev/null +++ b/dbms/src/Parsers/ASTDDLQueryWithOnCluster.h @@ -0,0 +1,22 @@ +#pragma once + +#include + + +namespace DB +{ + +class ASTDDLQueryWithOnCluster +{ +public: + + String cluster; + + virtual ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database = {}) const = 0; + + std::string getRewrittenQueryWithoutOnCluster(const std::string & new_database = {}) const; + + virtual ~ASTDDLQueryWithOnCluster() = default; +}; + +} diff --git a/dbms/src/Parsers/ASTDropQuery.h b/dbms/src/Parsers/ASTDropQuery.h index 836d87b145..3e9e535167 100644 --- a/dbms/src/Parsers/ASTDropQuery.h +++ b/dbms/src/Parsers/ASTDropQuery.h @@ -1,7 +1,7 @@ #pragma once #include - +#include namespace DB { @@ -9,7 +9,7 @@ namespace DB /** DROP query */ -class ASTDropQuery : public IAST +class ASTDropQuery : public IAST, public ASTDDLQueryWithOnCluster { public: bool detach{false}; /// DETACH query, not DROP. @@ -25,6 +25,18 @@ public: ASTPtr clone() const override { return std::make_shared(*this); } + ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override + { + auto query_ptr = clone(); + ASTDropQuery & query = static_cast(*query_ptr); + + query.cluster.clear(); + if (query.database.empty()) + query.database = new_database; + + return query_ptr; + } + protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override { @@ -41,7 +53,8 @@ protected: settings.ostr << (settings.hilite ? hilite_keyword : "") << (detach ? "DETACH TABLE " : "DROP TABLE ") << (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "") - << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); + << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) + << (!cluster.empty() ? " ON CLUSTER " + backQuoteIfNeed(cluster) + " " : ""); } }; diff --git a/dbms/src/Parsers/ParserCreateQuery.cpp b/dbms/src/Parsers/ParserCreateQuery.cpp index 64c1b9102c..aa2523a980 100644 --- a/dbms/src/Parsers/ParserCreateQuery.cpp +++ b/dbms/src/Parsers/ParserCreateQuery.cpp @@ -151,8 +151,6 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ParserString s_attach("ATTACH", true, true); ParserString s_table("TABLE", true, true); ParserString s_database("DATABASE", true, true); - ParserString s_on("ON", true, true); - ParserString s_cluster("CLUSTER", true, true); ParserString s_dot("."); ParserString s_lparen("("); ParserString s_rparen(")"); @@ -247,11 +245,11 @@ bool ParserCreateQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ws.ignore(pos, end); } - if (s_on.ignore(pos, end, max_parsed_pos, expected)) + if (ParserString{"ON", true, true}.ignore(pos, end, max_parsed_pos, expected)) { ws.ignore(pos, end); - if (!s_cluster.ignore(pos, end, max_parsed_pos, expected)) + if (!ParserString{"CLUSTER", true, true}.ignore(pos, end, max_parsed_pos, expected)) return false; ws.ignore(pos, end); diff --git a/dbms/src/Parsers/ParserDropQuery.cpp b/dbms/src/Parsers/ParserDropQuery.cpp index 9325e82aae..5f91a767d5 100644 --- a/dbms/src/Parsers/ParserDropQuery.cpp +++ b/dbms/src/Parsers/ParserDropQuery.cpp @@ -27,6 +27,7 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par ASTPtr database; ASTPtr table; + ASTPtr cluster; bool detach = false; bool if_exists = false; @@ -81,6 +82,21 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par ws.ignore(pos, end); } + + if (ParserString{"ON", true, true}.ignore(pos, end, max_parsed_pos, expected)) + { + ws.ignore(pos, end); + + if (!ParserString{"CLUSTER", true, true}.ignore(pos, end, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + + if (!name_p.parse(pos, end, cluster, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + } } ws.ignore(pos, end); @@ -94,6 +110,8 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par query->database = typeid_cast(*database).name; if (table) query->table = typeid_cast(*table).name; + if (cluster) + query->cluster = typeid_cast(*cluster).name; return true; } diff --git a/dbms/src/Parsers/formatAST.cpp b/dbms/src/Parsers/formatAST.cpp index bdb50dc471..c8672093ce 100644 --- a/dbms/src/Parsers/formatAST.cpp +++ b/dbms/src/Parsers/formatAST.cpp @@ -11,12 +11,6 @@ void formatAST(const IAST & ast, std::ostream & s, size_t indent, bool hilite, b ast.format(settings); } -std::string formatASTToString(const IAST & ast) -{ - std::stringstream s; - formatAST(ast, s, 0, false, true); - return s.str(); -} String formatColumnsForCreateQuery(NamesAndTypesList & columns) { diff --git a/dbms/src/Parsers/formatAST.h b/dbms/src/Parsers/formatAST.h index 7f227bb0dc..057e2a5149 100644 --- a/dbms/src/Parsers/formatAST.h +++ b/dbms/src/Parsers/formatAST.h @@ -14,8 +14,6 @@ namespace DB */ void formatAST(const IAST & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false); -std::string formatASTToString(const IAST & ast); - String formatColumnsForCreateQuery(NamesAndTypesList & columns); inline std::ostream & operator<<(std::ostream & os, const IAST & ast) { return formatAST(ast, os, 0, false, true), os; } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index e6e18e0cf1..1fe0c40079 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -416,11 +416,11 @@ int Server::main(const std::vector & args) if (has_zookeeper && config().has("distributed_ddl")) { - auto ddl_worker = std::make_shared(config(), "distributed_ddl", *global_context); - global_context->setDDLWorker(ddl_worker); + String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/"); + global_context->setDDLWorker(std::make_shared(ddl_zookeeper_path, *global_context)); } - SCOPE_EXIT( + SCOPE_EXIT({ /** Ask to cancel background jobs all table engines, * and also query_log. * It is important to do early, not in destructor of Context, because diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index a3f35b3bf8..f3d76ef8ce 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -82,12 +82,12 @@ public: for (const auto & key : keys) { if (key == "task_queue_path") - task_queue_path = config.getString(config_name + "." + key); + ddl_queries_root = config.getString(config_name + "." + key); else throw Exception{"Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG}; } - if (task_queue_path.empty()) + if (ddl_queries_root.empty()) throw Exception{"Resharding: missing parameter task_queue_path", ErrorCodes::INVALID_CONFIG_PARAMETER}; } @@ -96,11 +96,11 @@ public: std::string getTaskQueuePath() const { - return task_queue_path; + return ddl_queries_root; } private: - std::string task_queue_path; + std::string ddl_queries_root; }; /// Helper class we use to read and write the status of a coordinator -- GitLab