diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 96e08cb9eb27a1993e82c257a3e62ec92edbd32f..902fa274c91092467cefa4e86aa0899bcd11fa29 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -5,6 +5,11 @@ #include #include +#include +#include +#include +#include + #include #include #include @@ -27,36 +32,51 @@ namespace ErrorCodes { extern const int UNKNOWN_ELEMENT_IN_CONFIG; extern const int INVALID_CONFIG_PARAMETER; + extern const int UNKNOWN_FORMAT_VERSION; } -namespace { - -/// Helper class which extracts from the ClickHouse configuration file -/// the parameters we need for operating the resharding thread. -// struct Arguments -// { -// public: -// Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) -// { -// Poco::Util::AbstractConfiguration::Keys keys; -// config.keys(config_name, keys); -// -// for (const auto & key : keys) -// { -// if (key == "distributed_ddl_root") -// ddl_queries_root = config.getString(config_name + "." + key); -// else -// throw Exception{"Unknown parameter in distributed DDL configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG}; -// } -// -// if (ddl_queries_root.empty()) -// throw Exception{"Distributed DDL: missing parameter distributed_ddl_root", ErrorCodes::INVALID_CONFIG_PARAMETER}; -// } -// -// std::string ddl_queries_root; -// }; -} +struct DDLLogEntry +{ + String query; + Strings hosts; + String initiator; + + static constexpr char CURRENT_VERSION = '1'; + + String toString() + { + String res; + { + WriteBufferFromString wb(res); + + writeChar(CURRENT_VERSION, wb); + wb << "\n"; + wb << "query: " << double_quote << query << "\n"; + wb << "hosts: " << double_quote << hosts << "\n"; + wb << "initiator: " << double_quote << initiator << "\n"; + } + + return res; + } + + void parse(const String & data) + { + ReadBufferFromString rb(data); + + char version; + readChar(version, rb); + if (version != CURRENT_VERSION) + throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION); + + rb >> "\n"; + rb >> "query: " >> double_quote >> query >> "\n"; + rb >> "hosts: " >> double_quote >> hosts >> "\n"; + rb >> "initiator: " >> double_quote >> initiator >> "\n"; + + assertEOF(rb); + } +}; DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_) @@ -67,9 +87,11 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_) root_dir.resize(root_dir.size() - 1); hostname = getFQDNOrHostName() + ':' + DB::toString(context.getTCPPort()); - assign_dir = getAssignsDir() + hostname; master_dir = getMastersDir() + hostname; + zookeeper = context.getZooKeeper(); + event_queue_updated = std::make_shared(); + thread = std::thread(&DDLWorker::run, this); } @@ -84,135 +106,134 @@ DDLWorker::~DDLWorker() void DDLWorker::processTasks() { - auto zookeeper = context.getZooKeeper(); + Strings queue_nodes; + int code = zookeeper->tryGetChildren(root_dir, queue_nodes, nullptr, event_queue_updated); + if (code != ZNONODE) + throw zkutil::KeeperException(code); - if (!zookeeper->exists(assign_dir)) + /// Threre are no tasks + if (code == ZNONODE || queue_nodes.empty()) return; - Strings tasks = zookeeper->getChildren(assign_dir); - if (tasks.empty()) - return; + bool server_startup = last_processed_node_name.empty(); - String current_task = *std::min_element(tasks.cbegin(), tasks.cend()); + std::sort(queue_nodes.begin(), queue_nodes.end()); + auto begin_node = server_startup + ? queue_nodes.begin() + : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name); - try - { - processTask(current_task); - } - catch (...) + for (auto it = begin_node; it != queue_nodes.end(); ++it) { - tryLogCurrentException(log, "An unexpected error occurred during task " + current_task); - throw; + String node_data, node_name = *it, node_path = root_dir + "/" + node_name; + code = zookeeper->tryGet(node_path, node_data); + + /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list. + if (code != ZNONODE) + throw zkutil::KeeperException(code); + + DDLLogEntry node; + node.parse(node_data); + + bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), hostname) != node.hosts.cend(); + + bool already_processed = !zookeeper->exists(node_path + "/failed/" + hostname) + && !zookeeper->exists(node_path + "/sucess/" + hostname); + + if (!server_startup && already_processed) + { + throw Exception( + "Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK", + ErrorCodes::LOGICAL_ERROR); + } + + if (host_in_hostlist && !already_processed) + { + try + { + processTask(node, node_name); + } + catch (...) + { + /// It could be network error, but we mark node as processed anyway. + last_processed_node_name = node_name; + + tryLogCurrentException(log, + "An unexpected error occurred during processing DLL query " + node.query + " (" + node_name + ")"); + throw; + } + } + + last_processed_node_name = node_name; } } -bool DDLWorker::processTask(const std::string & task) +/// Try to create unexisting "status" dirs for a node +void DDLWorker::createStatusDirs(const std::string & node_path) { - auto zookeeper = context.getZooKeeper(); + auto acl = zookeeper->getDefaultACL(); - String query_dir = root_dir + "/" + task; - String assign_node = assign_dir + "/" + task; - String active_node = query_dir + "/active/" + hostname; - String sucsess_node = query_dir + "/sucess/" + hostname; - String fail_node = query_dir + "/failed/" + hostname; + zkutil::Ops ops; + ops.emplace_back(std::make_unique(node_path + "/active", "", acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(node_path + "/sucess", "", acl, zkutil::CreateMode::Persistent)); + ops.emplace_back(std::make_unique(node_path + "/failed", "", acl, zkutil::CreateMode::Persistent)); - String query = zookeeper->get(query_dir); + int code = zookeeper->tryMulti(ops); + + if (code != ZOK && code != ZNODEEXISTS) + throw zkutil::KeeperException(code); +} - if (zookeeper->exists(sucsess_node) || zookeeper->exists(fail_node)) - { - throw Exception( - "Task " + task + " (query " + query + ") was already processed by node " + hostname, ErrorCodes::LOGICAL_ERROR); - } - /// Create active flag - zookeeper->create(active_node, "", zkutil::CreateMode::Ephemeral); +bool DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name) +{ + String node_path = root_dir + "/" + node_name; + createStatusDirs(node_path); + + String active_flag_path = node_path + "/active/" + hostname; + zookeeper->create(active_flag_path, "", zkutil::CreateMode::Ephemeral); - /// Will delete task from host's tasks list, delete active flag and ... + /// At the end we will delete active flag and ... zkutil::Ops ops; - ops.emplace_back(std::make_unique(assign_node, -1)); - ops.emplace_back(std::make_unique(active_node, -1)); + auto acl = zookeeper->getDefaultACL(); + ops.emplace_back(std::make_unique(active_flag_path, -1)); try { - executeQuery(query, context); + executeQuery(node.query, context); } catch (...) { /// ... and create fail flag + String fail_flag_path = node_path + "/failed/" + hostname; String exception_msg = getCurrentExceptionMessage(false, true); - ops.emplace_back(std::make_unique(fail_node, exception_msg, nullptr, zkutil::CreateMode::Persistent)); + + ops.emplace_back(std::make_unique(fail_flag_path, exception_msg, acl, zkutil::CreateMode::Persistent)); zookeeper->multi(ops); - tryLogCurrentException(log, "Query " + query + " wasn't finished successfully"); + tryLogCurrentException(log, "Query " + node.query + " wasn't finished successfully"); return false; } - /// .. and create sucess flag - ops.emplace_back(std::make_unique(sucsess_node, "", nullptr, zkutil::CreateMode::Persistent)); + /// ... and create sucess flag + String fail_flag_path = node_path + "/sucess/" + hostname; + ops.emplace_back(std::make_unique(fail_flag_path, "", acl, zkutil::CreateMode::Persistent)); zookeeper->multi(ops); return true; } -void DDLWorker::enqueueQuery(const String & query, const std::vector & addrs) +void DDLWorker::enqueueQuery(DDLLogEntry & entry) { - auto zookeeper = context.getZooKeeper(); + if (entry.hosts.empty()) + return; - String assigns_dir = getAssignsDir(); - String master_dir = getCurrentMasterDir(); String query_path_prefix = getRoot() + "/query-"; + zookeeper->createAncestors(query_path_prefix); - zookeeper->createAncestors(assigns_dir + "/"); - zookeeper->createAncestors(master_dir + "/"); - String query_path = zookeeper->create(query_path_prefix, query, zkutil::CreateMode::PersistentSequential); - String query_node = query_path.substr(query_path.find_last_of('/') + 1); - - zkutil::Ops ops; - auto acl = zookeeper->getDefaultACL(); - constexpr size_t max_ops_per_call = 100; - - /// Create /root/masters/query_node and /root/query-node/* to monitor status of the tasks initiated by us - { - String num_hosts = toString(addrs.size()); - String master_query_node = master_dir + "/" + query_node; - ops.emplace_back(std::make_unique(master_query_node, "", acl, zkutil::CreateMode::Persistent)); - ops.emplace_back(std::make_unique(query_path + "/active", "", acl, zkutil::CreateMode::Persistent)); - ops.emplace_back(std::make_unique(query_path + "/sucess", num_hosts, acl, zkutil::CreateMode::Persistent)); - ops.emplace_back(std::make_unique(query_path + "/failed", "", acl, zkutil::CreateMode::Persistent)); - zookeeper->multi(ops); - ops.clear(); - } - - /// Create hosts's taks dir /root/assigns/host (if not exists) - for (auto it = addrs.cbegin(); it != addrs.cend(); ++it) - { - String cur_assign_dir = assigns_dir + "/" + it->toString(); - ops.emplace_back(std::make_unique(cur_assign_dir, "", acl, zkutil::CreateMode::Persistent)); - - if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend()) - { - int code = zookeeper->tryMulti(ops); - ops.clear(); - - if (code != ZOK && code != ZNODEEXISTS) - throw zkutil::KeeperException(code); - } - } - - /// Asssign tasks to hosts /root/assigns/host/query_node - for (auto it = addrs.cbegin(); it != addrs.cend(); ++it) - { - String cur_task_path = assigns_dir + "/" + it->toString() + "/" + query_node; - ops.emplace_back(std::make_unique(cur_task_path, "", acl, zkutil::CreateMode::Persistent)); - - if (ops.size() > max_ops_per_call || std::next(it) == addrs.cend()) - { - zookeeper->multi(ops); - ops.clear(); - } - } + String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); + createStatusDirs(node_path); } @@ -226,83 +247,45 @@ void DDLWorker::run() { processTasks(); } - catch (const std::exception & ex) + catch (...) { - LOG_ERROR(log, ex.what()); + tryLogCurrentException(log); } - std::unique_lock g(lock); - cond_var.wait_for(g, 10s); + //std::unique_lock g(lock); + //cond_var.wait_for(g, 10s); + + event_queue_updated->wait(); } } -static bool getRemoteQueryExecutionStatus(const Cluster::Address & addr, const std::string & query, Exception & out_exception) +class DDLQueryStatusInputSream : IProfilingBlockInputStream { - Connection conn(addr.host_name, addr.port, "", addr.user, addr.password); - conn.sendQuery(query); - - while (true) - { - Connection::Packet packet = conn.receivePacket(); - - if (packet.type == Protocol::Server::Exception) - { - out_exception = *packet.exception; - return false; - } - else if (packet.type == Protocol::Server::EndOfStream) - break; - } - return true; -} +}; BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context) { ClusterPtr cluster = context.getCluster(cluster_name); Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses(); - std::vector pending_hosts; Array hosts_names_failed, hosts_errors, hosts_names_pending; size_t num_hosts_total = 0; size_t num_hosts_finished_successfully = 0; + DDLWorker & ddl_worker = context.getDDLWorker(); + + DDLLogEntry entry; + entry.query = query; + entry.initiator = ddl_worker.getHostName(); + for (const auto & shard : shards) - { for (const auto & addr : shard) - { - try - { - Exception ex; - if (!getRemoteQueryExecutionStatus(addr, query, ex)) - { - /// Normal error - String exception_msg = getExceptionMessage(ex, false, true); - hosts_names_failed.emplace_back(addr.host_name); - hosts_errors.emplace_back(exception_msg); - - LOG_INFO(&Logger::get("DDLWorker"), "Query " << query << " failed on " << addr.host_name << ": " << exception_msg); - } - else - { - ++num_hosts_finished_successfully; - } - } - catch (...) - { - /// Network error - pending_hosts.emplace_back(addr); - hosts_names_pending.emplace_back(addr.host_name); - } - } - num_hosts_total += shard.size(); - } - + entry.hosts.emplace_back(addr.toString()); - if (!pending_hosts.empty()) - context.getDDLWorker().enqueueQuery(query, pending_hosts); + ddl_worker.enqueueQuery(entry); auto aray_of_strings = std::make_shared(std::make_shared()); @@ -316,12 +299,12 @@ BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_na {aray_of_strings->clone(), "hosts_pending"} }; - size_t num_hosts_finished = num_hosts_total - pending_hosts.size(); + size_t num_hosts_finished = num_hosts_total; size_t num_hosts_finished_unsuccessfully = num_hosts_finished - num_hosts_finished_successfully; block.getByName("num_hosts_total").column->insert(num_hosts_total); block.getByName("num_hosts_finished_successfully").column->insert(num_hosts_finished_successfully); block.getByName("num_hosts_finished_unsuccessfully").column->insert(num_hosts_finished_unsuccessfully); - block.getByName("num_hosts_pending").column->insert(pending_hosts.size()); + block.getByName("num_hosts_pending").column->insert(0LU); block.getByName("hosts_finished_unsuccessfully").column->insert(hosts_names_failed); block.getByName("hosts_finished_unsuccessfully_errors").column->insert(hosts_errors); block.getByName("hosts_pending").column->insert(hosts_names_pending); diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index 141419432128c3aa8910de19aeb74feaf8effe6b..cc20e05bb9cd88872afc59f46c8f30fd9a07b12b 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -13,17 +13,19 @@ namespace DB { - BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context); +struct DDLLogEntry; + + class DDLWorker { public: DDLWorker(const std::string & zk_root_dir, Context & context_); ~DDLWorker(); - void enqueueQuery(const String & query, const std::vector & addrs); + void enqueueQuery(DDLLogEntry & entry); /// Returns root/ path in ZooKeeper std::string getRoot() const @@ -31,11 +33,6 @@ public: return root_dir; } - std::string getAssignsDir() const - { - return root_dir + "/assigns"; - } - std::string getMastersDir() const { return root_dir + "/masters"; @@ -53,7 +50,9 @@ public: private: void processTasks(); - bool processTask(const std::string & task); + bool processTask(const DDLLogEntry & node, const std::string & node_path); + + void createStatusDirs(const std::string & node_name); void processQueries(); bool processQuery(const std::string & task); @@ -67,7 +66,12 @@ private: std::string hostname; std::string root_dir; /// common dir with queue of queries std::string assign_dir; /// dir with tasks assigned to the server - std::string master_dir; /// dir with queries was initiated by the server + std::string master_dir; /// dir with queries was initiated by the server + + std::string last_processed_node_name; + + std::shared_ptr zookeeper; + std::shared_ptr queue_updated; std::atomic stop_flag; std::condition_variable cond_var;