DDLWorker.cpp 22.4 KB
Newer Older
1
#include <Interpreters/DDLWorker.h>
2 3

#include <Parsers/ASTAlterQuery.h>
4
#include <Parsers/ASTQueryWithOnCluster.h>
5 6 7
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
8

9 10 11 12 13
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

14 15
#include <Storages/IStorage.h>

16
#include <Interpreters/executeQuery.h>
17 18 19 20 21 22 23 24 25 26 27
#include <Interpreters/Cluster.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/getFQDNOrHostName.h>

#include <Core/Block.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
28 29

#include <zkutil/ZooKeeper.h>
30
#include <zkutil/Lock.h>
31 32
#include <Poco/Timestamp.h>

33 34 35 36 37 38 39 40

namespace DB
{

namespace ErrorCodes
{
    extern const int UNKNOWN_ELEMENT_IN_CONFIG;
    extern const int INVALID_CONFIG_PARAMETER;
41
    extern const int UNKNOWN_FORMAT_VERSION;
42 43
    extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
    extern const int INCONSISTENT_CLUSTER_DEFINITION;
44
    extern const int UNFINISHED;
45 46 47
}


48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
struct DDLLogEntry
{
    String query;
    Strings hosts;
    String initiator;

    static constexpr char CURRENT_VERSION = '1';

    String toString()
    {
        String res;
        {
            WriteBufferFromString wb(res);

            writeChar(CURRENT_VERSION, wb);
            wb << "\n";
64 65 66
            wb << "query: " << query << "\n";
            wb << "hosts: " << hosts << "\n";
            wb << "initiator: " << initiator << "\n";
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
        }

        return res;
    }

    void parse(const String & data)
    {
        ReadBufferFromString rb(data);

        char version;
        readChar(version, rb);
        if (version != CURRENT_VERSION)
            throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION);

        rb >> "\n";
82 83 84
        rb >> "query: " >> query >> "\n";
        rb >> "hosts: " >> hosts >> "\n";
        rb >> "initiator: " >> initiator >> "\n";
85 86 87 88

        assertEOF(rb);
    }
};
89 90


91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
// static String serializeReturnStatus(int return_code, const String & return_msg)
// {
//     String res;
//     {
//         WriteBufferFromString wb(res);
//         wb << return_code << "\n" << return_msg;
//     }
//     return res;
// }
//
//
// static void parseReturnStatus(const String & data, int & return_code, String & return_msg)
// {
//     ReadBufferFromString rb(data);
//     rb >> return_code >> "\n" >> return_msg;
// }


static const std::pair<ssize_t, ssize_t> tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
{
    for (size_t shard_num = 0; shard_num < cluster.size(); ++shard_num)
    {
        for (size_t host_num = 0; host_num < cluster[shard_num].size(); ++host_num)
        {
            const Cluster::Address & address = cluster[shard_num][host_num];
            if (address.host_name == host_name && address.port == port)
                return {shard_num, host_num};
        }
    }

    return {-1, -1};
}

// static const Cluster::Address * tryGetAddressOfHost(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
// {
//     std::pair<ssize_t, ssize_t> host_pos = tryGetShardAndHostNum(cluster, host_name, port);
//
//     if (host_pos.first < 0 || host_pos.second < 0)
//         return nullptr;
//
//     return &cluster.at(host_pos.first).at(host_pos.second);
// }

static bool isSupportedAlterType(int type)
{
    static const std::unordered_set<int> supported_alter_types{
        ASTAlterQuery::ADD_COLUMN,
        ASTAlterQuery::DROP_COLUMN,
        ASTAlterQuery::MODIFY_COLUMN,
        ASTAlterQuery::MODIFY_PRIMARY_KEY,
        ASTAlterQuery::DROP_PARTITION
    };

    return supported_alter_types.count(type);
}


148 149 150 151 152 153
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
    : context(context_), stop_flag(false)
{
    root_dir = zk_root_dir;
    if (root_dir.back() == '/')
        root_dir.resize(root_dir.size() - 1);
154

155 156 157
    host_name = getFQDNOrHostName();
    port = context.getTCPPort();
    host_id = host_name + ':' + DB::toString(port);
158

159 160
    event_queue_updated = std::make_shared<Poco::Event>();

161 162 163
    thread = std::thread(&DDLWorker::run, this);
}

164

165 166 167
DDLWorker::~DDLWorker()
{
    stop_flag = true;
168 169
    //cond_var.notify_one();
    event_queue_updated->set();
170 171 172
    thread.join();
}

173

174 175
void DDLWorker::processTasks()
{
176
    auto zookeeper = context.getZooKeeper();
177
    LOG_DEBUG(log, "processTasks");
178

179 180
    Strings queue_nodes = zookeeper->getChildren(root_dir, nullptr, event_queue_updated);
    if (queue_nodes.empty())
181 182
        return;

183
    bool server_startup = last_processed_node_name.empty();
184

185 186 187 188
    std::sort(queue_nodes.begin(), queue_nodes.end());
    auto begin_node = server_startup
        ? queue_nodes.begin()
        : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name);
189

190
    for (auto it = begin_node; it != queue_nodes.end(); ++it)
191
    {
192
        String node_data, node_name = *it, node_path = root_dir + "/" + node_name;
193 194 195 196 197 198
        LOG_DEBUG(log, "Fetching node " << node_path);
        if (!zookeeper->tryGet(node_path, node_data))
        {
            /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
            continue;
        }
199 200 201 202

        DDLLogEntry node;
        node.parse(node_data);

203
        bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), host_id) != node.hosts.cend();
204

205 206
        bool already_processed = zookeeper->exists(node_path + "/failed/" + host_id)
                                 || zookeeper->exists(node_path + "/sucess/" + host_id);
207

208
        LOG_DEBUG(log, "Checking node " << node_name << ", " << node.query << " status: " << host_in_hostlist << " " << already_processed);
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234

        if (!server_startup && already_processed)
        {
            throw Exception(
                "Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
                ErrorCodes::LOGICAL_ERROR);
        }

        if (host_in_hostlist && !already_processed)
        {
            try
            {
                processTask(node, node_name);
            }
            catch (...)
            {
                /// It could be network error, but we mark node as processed anyway.
                last_processed_node_name = node_name;

                tryLogCurrentException(log,
                    "An unexpected error occurred during processing DLL query " + node.query + " (" + node_name + ")");
                throw;
            }
        }

        last_processed_node_name = node_name;
235
    }
236 237
}

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413

static bool tryExecuteQuery(const String & query, Context & context, int & return_code, String & exception, Logger * log)
{
    try
    {
        executeQuery(query, context);
    }
    catch (...)
    {
        exception = getCurrentExceptionMessage(false, true);
        return_code = getCurrentExceptionCode();
        if (log)
            tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");

        return false;
    }

    return_code = 0;
    exception = "";
    if (log)
        LOG_DEBUG(log, "Executed query: " << query);

    return true;
}


bool DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
{
    auto zookeeper = context.getZooKeeper();
    LOG_DEBUG(log, "Process " << node_name << " node, query " << node.query);

    String node_path = root_dir + "/" + node_name;
    createStatusDirs(node_path);

    String active_flag_path = node_path + "/active/" + host_id;
    zookeeper->create(active_flag_path, "", zkutil::CreateMode::Ephemeral);


    LOG_DEBUG(log, "Process query: " << node.query);


    ASTPtr query_ast;
    {
        ParserQuery parser_query;
        String description;
        IParser::Pos begin = &node.query.front();
        query_ast = parseQuery(parser_query, begin, begin + node.query.size(), description);
    }

    const ASTQueryWithOnCluster * query;
    if (!query_ast || !(query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ast.get())))
        throw Exception("Recieved unsupported DDL query", ErrorCodes::NOT_IMPLEMENTED);

    String cluster_name = query->cluster;
    auto cluster = context.getCluster(cluster_name);

    ssize_t shard_num, host_num;
    std::tie(shard_num, host_num) = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port);
    if (shard_num < 0 || host_num < 0)
    {
        throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
                        ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
    }

    const auto & shard = cluster->getShardsWithFailoverAddresses().at(shard_num);
    const auto & host_address = shard.at(host_num);
    ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
    String rewritten_query = queryToString(rewritten_ast);


    LOG_DEBUG(log, "Executing query: " << rewritten_query);

    int result_code = 0;
    String result_message;


    if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
    {
        String database = query_alter->database.empty() ? context.getCurrentDatabase() : query_alter->database;
        StoragePtr storage = context.getTable(database, query_alter->table);

        bool execute_once_on_replica = storage->supportsReplication();
        bool execute_on_leader_replica = false;

        for (const auto & param : query_alter->parameters)
        {
            if (!isSupportedAlterType(param.type))
                throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);

            if (execute_once_on_replica)
                execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION;
        }

        if (execute_once_on_replica)
        {
            const auto & shard_info = cluster->getShardsInfo().at(shard_num);
            if (!shard_info.hasInternalReplication())
            {
                throw Exception("Table " + query_alter->table + " is replicated, but shard #" + toString(shard_num + 1) +
                    " isn't replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
            }

            Strings replica_names;
            for (const auto & address : shard)
                replica_names.emplace_back(address.toString());
            std::sort(replica_names.begin(), replica_names.end());

            String shard_dir_name;
            for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
                shard_dir_name += *it + (std::next(it) != replica_names.end() ? "," : "");

            String shard_path = node_path + "/shards/" + shard_dir_name;
            String is_executed_path = shard_path + "/executed";
            zookeeper->createAncestors(shard_path + "/");

            bool executed = false;
            {
                auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
                zookeeper_holder->initFromInstance(zookeeper);

                zkutil::Lock lock(zookeeper_holder, shard_path, "lock", host_id);
                std::mt19937 rng(std::hash<String>{}(host_id) + reinterpret_cast<intptr_t>(&rng));
                for (int num_tries = 0; num_tries < 10; ++num_tries)
                {
                    if (zookeeper->exists(is_executed_path))
                    {
                        executed = true;
                        break;
                    }

                    if (lock.tryLock())
                    {
                        tryExecuteQuery(rewritten_query, context, result_code, result_message, log);

                        if (execute_on_leader_replica && result_code == ErrorCodes::NOT_IMPLEMENTED)
                        {
                            /// TODO: it is ok to recieve exception "host is not leader"
                        }

                        zookeeper->create(is_executed_path, host_id, zkutil::CreateMode::Persistent);
                        lock.unlock();
                        executed = true;
                        break;
                    }

                    std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, 1)(rng)));
                }
            }

            if (!executed)
            {
                result_code = ErrorCodes::NOT_IMPLEMENTED;
                result_message = "Cannot enqueue replicated DDL query";
            }
        }
        else
        {
            tryExecuteQuery(rewritten_query, context, result_code, result_message, log);
        }
    }
    else
    {
        tryExecuteQuery(rewritten_query, context, result_code, result_message, log);
    }

    /// Delete active flag and create sucess (or fail) flag
    zkutil::Ops ops;
    String result_path = node_path + (result_code ? "/failed/" : "/sucess/") + host_id;
    ops.emplace_back(std::make_unique<zkutil::Op::Remove>(active_flag_path, -1));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(result_path, result_message, zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
    zookeeper->multi(ops);

    return !result_code;
}


414 415
void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
{
416 417
    auto zookeeper = context.getZooKeeper();

418 419 420 421 422
    /// Both ZK and Poco use Unix epoch
    size_t current_time_seconds = Poco::Timestamp().epochTime();
    constexpr size_t zookeeper_time_resolution = 1000;

    // Too early to check
423
    if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_after_seconds)
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
        return;

    last_cleanup_time_seconds = current_time_seconds;

    String data;
    zkutil::Stat stat;
    DDLLogEntry node;
    Strings failed_hosts, sucess_hosts;

    Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(root_dir);
    const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched;

    for (const String & node_name : node_names)
    {
        try
        {
            String node_path = root_dir + "/" + node_name;
            if (!zookeeper->tryGet(node_path, data, &stat))
                continue;

444
            /// TODO: Add shared lock to avoid rare race counditions.
445 446 447 448 449 450 451 452 453 454 455 456 457

            size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
            if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds)
            {
                size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
                LOG_INFO(log, "Lifetime of node " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
                zookeeper->removeRecursive(node_path);
                continue;
            }

            Strings sucess_nodes = zookeeper->getChildren(node_path + "/sucess");
            Strings failed_nodes = zookeeper->getChildren(node_path + "/failed");

458 459
            node.parse(data);

460 461 462 463 464 465 466 467 468 469 470 471 472
            if (sucess_nodes.size() + failed_nodes.size() >= node.hosts.size())
            {
                LOG_INFO(log, "Node " << node_name << " had been executed by each host, deleting it");
                zookeeper->removeRecursive(node_path);
            }
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue");
        }
    }
}

473

474 475
/// Try to create unexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
476
{
477
    auto zookeeper = context.getZooKeeper();
478
    auto acl = zookeeper->getDefaultACL();
479

480 481 482 483
    zkutil::Ops ops;
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/sucess", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/failed", "", acl, zkutil::CreateMode::Persistent));
484

485 486 487 488 489
    int code = zookeeper->tryMulti(ops);

    if (code != ZOK && code != ZNODEEXISTS)
        throw zkutil::KeeperException(code);
}
490 491


492
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
493
{
494
    if (entry.hosts.empty())
495
        return {};
496

497 498
    auto zookeeper = context.getZooKeeper();

499
    String query_path_prefix = root_dir + "/query-";
500
    zookeeper->createAncestors(query_path_prefix);
501

502 503
    String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
    createStatusDirs(node_path);
504 505

    return node_path;
506 507
}

508

509 510 511
void DDLWorker::run()
{
    using namespace std::chrono_literals;
512
    auto zookeeper = context.getZooKeeper();
513

514
    zookeeper->createAncestors(root_dir + "/");
515 516
    LOG_DEBUG(log, "Started DDLWorker thread");

517 518
    while (!stop_flag)
    {
519 520
        LOG_DEBUG(log, "Begin tasks processing");

521 522
        try
        {
523
            processTasks();
524
        }
525
        catch (...)
526
        {
527
            tryLogCurrentException(log);
528 529
        }

530 531 532 533 534
        //std::unique_lock<std::mutex> g(lock);
        //cond_var.wait_for(g, 10s);
        LOG_DEBUG(log, "Waiting watch");
        event_queue_updated->wait();

535 536
        try
        {
537
            cleanupQueue();
538 539 540 541 542
        }
        catch (...)
        {
            tryLogCurrentException(log);
        }
543 544 545
    }
}

546

547
class DDLQueryStatusInputSream : public IProfilingBlockInputStream
548
{
549
public:
550

551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
    DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts)
    : node_path(zk_node_path), context(context)
    {
        sample = Block{
            {std::make_shared<DataTypeString>(),    "host"},
            {std::make_shared<DataTypeUInt64>(),    "status"},
            {std::make_shared<DataTypeString>(),    "error"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_remaining"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_active"},
        };

        setTotalRowsApprox(num_hosts);
    }

    String getName() const override
    {
        return "DDLQueryStatusInputSream";
    }

    String getID() const override
    {
        return "DDLQueryStatusInputSream(" + node_path + ")";
    }

    Block readImpl() override
    {
        Block res;
        if (num_hosts_finished >= total_rows_approx)
            return res;

        auto zookeeper = context.getZooKeeper();
        size_t try_number = 0;

        while(res.rows() == 0)
        {
            if (is_cancelled)
                return res;

            if (num_hosts_finished != 0 || try_number != 0)
590
                std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(20LU, try_number + 1)));
591

592
            /// TODO: add /lock node somewhere
593
            if (!zookeeper->exists(node_path))
594 595 596 597 598
            {
                throw Exception("Cannot provide query execution status. The query's node " + node_path
                                + " had been deleted by cleaner since it was finished (or its lifetime is expired)",
                                ErrorCodes::UNFINISHED);
            }
599

600 601
            Strings new_sucess_hosts = getNewAndUpdate(sucess_hosts_set, getChildrenAllowNoNode(zookeeper, node_path + "/sucess"));
            Strings new_failed_hosts = getNewAndUpdate(failed_hosts_set, getChildrenAllowNoNode(zookeeper, node_path + "/failed"));
602 603 604 605 606 607 608 609

            Strings new_hosts = new_sucess_hosts;
            new_hosts.insert(new_hosts.end(), new_failed_hosts.cbegin(), new_failed_hosts.cend());
            ++try_number;

            if (new_hosts.empty())
                continue;

610 611
            Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");

612 613 614 615 616 617 618 619
            res = sample.cloneEmpty();
            for (size_t i = 0; i < new_hosts.size(); ++i)
            {
                bool fail = i >= new_sucess_hosts.size();
                res.getByName("host").column->insert(new_hosts[i]);
                res.getByName("status").column->insert(static_cast<UInt64>(fail));
                res.getByName("error").column->insert(fail ? zookeeper->get(node_path + "/failed/" + new_hosts[i]) : String{});
                res.getByName("num_hosts_remaining").column->insert(total_rows_approx - (++num_hosts_finished));
620
                res.getByName("num_hosts_active").column->insert(cur_active_hosts.size());
621 622
            }
        }
623

624 625 626
        return res;
    }

627 628 629 630 631 632 633 634 635 636
    static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
    {
        Strings res;
        int code = zookeeper->tryGetChildren(node_path, res);
        if (code != ZOK && code != ZNONODE)
            throw zkutil::KeeperException(code, node_path);
        return res;
    }

    static Strings getNewAndUpdate(NameSet & prev, const Strings & cur_list)
637 638 639 640 641
    {
        Strings diff;
        for (const String & elem : cur_list)
        {
            if (!prev.count(elem))
642
            {
643
                diff.emplace_back(elem);
644 645
                prev.emplace(elem);
            }
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
        }

        return diff;
    }

    ~DDLQueryStatusInputSream() override = default;

    Block sample;

private:
    String node_path;
    Context & context;

    NameSet sucess_hosts_set;
    NameSet failed_hosts_set;
    size_t num_hosts_finished = 0;
662
};
663 664


665
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
666
{
667 668 669 670 671 672
    const auto query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ptr.get());
    if (!query)
    {
        throw Exception("Distributed execution is not supported for such DDL queries",
                        ErrorCodes::NOT_IMPLEMENTED);
    }
673

674 675 676 677 678 679 680 681 682 683 684 685
    auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get());
    if (query_alter)
    {
        for (const auto & param : query_alter->parameters)
        {
            if (!isSupportedAlterType(param.type))
                throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
        }
    }

    ClusterPtr cluster = context.getCluster(query->cluster);
    DDLWorker & ddl_worker = context.getDDLWorker();
686

687
    DDLLogEntry entry;
688
    entry.query = queryToString(query_ptr);
689 690
    entry.initiator = ddl_worker.getHostName();

691
    Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
692
    for (const auto & shard : shards)
693
    {
694
        for (const auto & addr : shard)
695
            entry.hosts.emplace_back(addr.toString());
696
    }
697

698
    String node_path = ddl_worker.enqueueQuery(entry);
699 700

    BlockIO io;
701 702 703 704 705 706
    if (node_path.empty())
        return io;

    auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, context, entry.hosts.size());
    io.in_sample = stream->sample.cloneEmpty();
    io.in = std::move(stream);
707 708 709 710
    return io;
}


711
}