DDLWorker.cpp 23.4 KB
Newer Older
1
#include <Interpreters/DDLWorker.h>
2 3

#include <Parsers/ASTAlterQuery.h>
4
#include <Parsers/ASTQueryWithOnCluster.h>
5 6 7
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
8

9 10 11 12 13
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

14
#include <Storages/IStorage.h>
15
#include <DataStreams/OneBlockInputStream.h>
16

17
#include <Interpreters/executeQuery.h>
18
#include <Interpreters/Cluster.h>
19

20
#include <Common/getFQDNOrHostName.h>
21 22
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
23 24 25 26 27 28 29

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
30

31 32
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/Lock.h>
33 34
#include <Poco/Timestamp.h>

35 36
#include <experimental/optional>

37 38 39 40 41 42 43 44

namespace DB
{

namespace ErrorCodes
{
    extern const int UNKNOWN_ELEMENT_IN_CONFIG;
    extern const int INVALID_CONFIG_PARAMETER;
45
    extern const int UNKNOWN_FORMAT_VERSION;
46 47
    extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
    extern const int INCONSISTENT_CLUSTER_DEFINITION;
48
    extern const int TIMEOUT_EXCEEDED;
49
    extern const int UNFINISHED;
50 51 52
}


53 54 55 56
const size_t DDLWorker::node_max_lifetime_seconds = 7 * 24 * 60 * 60; // week
const size_t DDLWorker::cleanup_min_period_seconds = 60; // minute


57 58 59 60
struct DDLLogEntry
{
    String query;
    Strings hosts;
61
    String initiator; // optional
62

63
    static constexpr int CURRENT_VERSION = 1;
64 65 66 67 68 69 70

    String toString()
    {
        String res;
        {
            WriteBufferFromString wb(res);

71 72
            auto version = CURRENT_VERSION;
            wb << "version: " << version << "\n";
73
            wb << "query: " << escape << query << "\n";
74 75
            wb << "hosts: " << hosts << "\n";
            wb << "initiator: " << initiator << "\n";
76 77 78 79 80 81 82 83 84
        }

        return res;
    }

    void parse(const String & data)
    {
        ReadBufferFromString rb(data);

85 86 87
        int version;
        rb >> "version: " >> version >> "\n";

88
        if (version != CURRENT_VERSION)
89
            throw Exception("Unknown DDLLogEntry format version: " + DB::toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
90

91
        rb >> "query: " >> escape >> query >> "\n";
92
        rb >> "hosts: " >> hosts >> "\n";
93 94 95 96 97

        if (!rb.eof())
            rb >> "initiator: " >> initiator >> "\n";
        else
            initiator.clear();
98 99 100 101

        assertEOF(rb);
    }
};
102 103


104 105
using ShardAndHostNum = std::experimental::optional<std::pair<size_t, size_t>>;
static ShardAndHostNum tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
106 107 108 109 110 111 112
{
    for (size_t shard_num = 0; shard_num < cluster.size(); ++shard_num)
    {
        for (size_t host_num = 0; host_num < cluster[shard_num].size(); ++host_num)
        {
            const Cluster::Address & address = cluster[shard_num][host_num];
            if (address.host_name == host_name && address.port == port)
113
                return std::make_pair(shard_num, host_num);
114 115 116
        }
    }

117
    return {};
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
}


static bool isSupportedAlterType(int type)
{
    static const std::unordered_set<int> supported_alter_types{
        ASTAlterQuery::ADD_COLUMN,
        ASTAlterQuery::DROP_COLUMN,
        ASTAlterQuery::MODIFY_COLUMN,
        ASTAlterQuery::MODIFY_PRIMARY_KEY,
        ASTAlterQuery::DROP_PARTITION
    };

    return supported_alter_types.count(type);
}


135
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
136
    : context(context_)
137
{
138 139 140
    queue_dir = zk_root_dir;
    if (queue_dir.back() == '/')
        queue_dir.resize(queue_dir.size() - 1);
141

142 143
    host_name = getFQDNOrHostName();
    port = context.getTCPPort();
144
    host_id = Cluster::Address::toString(host_name, port);
145

146 147
    event_queue_updated = std::make_shared<Poco::Event>();

148 149 150
    thread = std::thread(&DDLWorker::run, this);
}

151

152 153 154
DDLWorker::~DDLWorker()
{
    stop_flag = true;
155
    event_queue_updated->set();
156 157 158
    thread.join();
}

159

160 161
void DDLWorker::processTasks()
{
162
    LOG_DEBUG(log, "Processing tasks");
163

164
    Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, event_queue_updated);
165
    if (queue_nodes.empty())
166 167
        return;

168
    bool server_startup = last_processed_node_name.empty();
169

170 171 172 173
    std::sort(queue_nodes.begin(), queue_nodes.end());
    auto begin_node = server_startup
        ? queue_nodes.begin()
        : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name);
174

175
    for (auto it = begin_node; it != queue_nodes.end(); ++it)
176
    {
177 178 179
        const String & node_name = *it;
        String node_path = queue_dir + "/" + node_name;
        String node_data;
180

181 182 183 184 185
        if (!zookeeper->tryGet(node_path, node_data))
        {
            /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
            continue;
        }
186 187 188 189

        DDLLogEntry node;
        node.parse(node_data);

190
        bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), host_id) != node.hosts.cend();
191
        bool already_processed = zookeeper->exists(node_path + "/finished/" + host_id);
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207

        if (!server_startup && already_processed)
        {
            throw Exception(
                "Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
                ErrorCodes::LOGICAL_ERROR);
        }

        if (host_in_hostlist && !already_processed)
        {
            try
            {
                processTask(node, node_name);
            }
            catch (...)
            {
208
                tryLogCurrentException(log, "An error occurred while processing node " + node_name + " (" + node.query + ")");
209 210 211
                throw;
            }
        }
212 213 214 215
        else
        {
            LOG_DEBUG(log, "Node " << node_name << " (" << node.query << ") will not be processed");
        }
216 217

        last_processed_node_name = node_name;
218
    }
219 220
}

221

222
static bool tryExecuteQuery(const String & query, Context & context, ExecutionStatus & status, Logger * log = nullptr)
223 224 225 226 227 228 229
{
    try
    {
        executeQuery(query, context);
    }
    catch (...)
    {
230 231
        status = ExecutionStatus::fromCurrentException();

232 233 234 235 236 237
        if (log)
            tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");

        return false;
    }

238
    status = ExecutionStatus(0);
239 240 241 242 243 244 245
    if (log)
        LOG_DEBUG(log, "Executed query: " << query);

    return true;
}


246
void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
247
{
248
    LOG_DEBUG(log, "Processing node " << node_name << " (" << node.query << ")");
249

250
    String node_path = queue_dir + "/" + node_name;
251 252
    createStatusDirs(node_path);

253
    bool should_not_execute = current_node == node_name && current_node_was_executed;
254

255 256 257 258
    if (!should_not_execute)
    {
        current_node = node_name;
        current_node_was_executed = false;
259

260
        zookeeper->create(node_path + "/active/" + host_id, "", zkutil::CreateMode::Ephemeral);
261

262 263 264 265
        try
        {
            ASTPtr query_ast;
            {
266
                const char * begin = &node.query.front();
267 268
                ParserQuery parser_query(begin + node.query.size());
                String description;
269 270
                query_ast = parseQuery(parser_query, begin, begin + node.query.size(), description);
            }
271

272 273 274 275 276 277 278
            const ASTQueryWithOnCluster * query;
            if (!query_ast || !(query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ast.get())))
                throw Exception("Recieved unsupported DDL query", ErrorCodes::NOT_IMPLEMENTED);

            String cluster_name = query->cluster;
            auto cluster = context.getCluster(cluster_name);

279 280
            auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port);
            if (!shard_host_num)
281 282 283 284
            {
                throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
                                ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
            }
285

286 287 288
            size_t shard_num = shard_host_num->first;
            size_t host_num = shard_host_num->second;

289 290 291
            const auto & host_address = cluster->getShardsWithFailoverAddresses().at(shard_num).at(host_num);
            ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
            String rewritten_query = queryToString(rewritten_ast);
292

293
            LOG_DEBUG(log, "Executing query: " << rewritten_query);
294

295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
            if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
            {
                processTaskAlter(query_alter, rewritten_query, cluster, shard_num, node_path);
            }
            else
            {
                tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
            }
        }
        catch (const zkutil::KeeperException & e)
        {
            throw;
        }
        catch (...)
        {
            current_node_execution_status = ExecutionStatus::fromCurrentException("An error occured during query preparation");
        }

        /// We need to distinguish ZK errors occured before and after query executing
        current_node_was_executed = true;
315 316
    }

317 318 319 320 321
    /// Delete active flag and create finish flag
    zkutil::Ops ops;
    ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path + "/active/" + host_id, -1));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished/" + host_id,
        current_node_execution_status.serializeText(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
322

323 324 325 326
    int code = zookeeper->tryMultiWithRetries(ops);
    if (code != ZOK && code != ZNONODE)
        throw zkutil::KeeperException("Cannot commit executed node " + node_name, code);
}
327 328


329 330 331 332 333 334 335 336 337
void DDLWorker::processTaskAlter(
    const ASTAlterQuery * query_alter,
    const String & rewritten_query,
    const std::shared_ptr<Cluster> & cluster,
    ssize_t shard_num,
    const String & node_path)
{
    String database = query_alter->database.empty() ? context.getCurrentDatabase() : query_alter->database;
    StoragePtr storage = context.getTable(database, query_alter->table);
338

339 340
    bool execute_once_on_replica = storage->supportsReplication();
    bool execute_on_leader_replica = false;
341

342
    for (const auto & param : query_alter->parameters)
343
    {
344 345
        if (!isSupportedAlterType(param.type))
            throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
346

347 348 349
        if (execute_once_on_replica)
            execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION;
    }
350

351 352
    const auto & shard_info = cluster->getShardsInfo().at(shard_num);
    bool config_is_replicated_shard = shard_info.hasInternalReplication();
353

354 355 356 357 358 359 360 361 362 363
    if (execute_once_on_replica && !config_is_replicated_shard)
    {
        throw Exception("Table " + query_alter->table + " is replicated, but shard #" + toString(shard_num + 1) +
            " isn't replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
    }
    else if (!execute_once_on_replica && config_is_replicated_shard)
    {
        throw Exception("Table " + query_alter->table + " isn't replicated, but shard #" + toString(shard_num + 1) +
            " replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
    }
364

365 366
    if (execute_once_on_replica)
    {
367 368 369 370
        /// The following code may perform ALTER twice if
        ///  current secver aquires lock, executes replicated alter,
        ///  losts zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
        /// To avoid this problem alter() method of replicated tables should be changed and takes into account ddl query id tag.
371 372
        if (!context.getSettingsRef().distributed_ddl_allow_replicated_alter)
            throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
373

374 375 376 377
        Strings replica_names;
        for (const auto & address : cluster->getShardsWithFailoverAddresses().at(shard_num))
            replica_names.emplace_back(address.toString());
        std::sort(replica_names.begin(), replica_names.end());
378

379 380 381
        String shard_dir_name;
        for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
            shard_dir_name += *it + (std::next(it) != replica_names.end() ? "," : "");
382

383 384 385
        String shard_path = node_path + "/shards/" + shard_dir_name;
        String is_executed_path = shard_path + "/executed";
        zookeeper->createAncestors(shard_path + "/");
386

387 388 389 390
        bool alter_executed_by_replica = false;
        {
            auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
            zookeeper_holder->initFromInstance(zookeeper);
391

392 393 394 395
            zkutil::Lock lock(zookeeper_holder, shard_path, "lock", host_id);
            std::mt19937 rng(std::hash<String>{}(host_id) + reinterpret_cast<intptr_t>(&rng));

            for (int num_tries = 0; num_tries < 10; ++num_tries)
396
            {
397 398 399 400 401
                if (zookeeper->exists(is_executed_path))
                {
                    alter_executed_by_replica = true;
                    break;
                }
402

403
                if (lock.tryLock())
404
                {
405
                    tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
406

407
                    if (execute_on_leader_replica && current_node_execution_status.code == ErrorCodes::NOT_IMPLEMENTED)
408
                    {
409
                        /// TODO: it is ok to recieve exception "host is not leader"
410 411
                    }

412 413 414 415
                    zookeeper->create(is_executed_path, host_id, zkutil::CreateMode::Persistent);
                    lock.unlock();
                    alter_executed_by_replica = true;
                    break;
416 417
                }

418
                std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, 1)(rng)));
419 420
            }
        }
421 422 423

        if (!alter_executed_by_replica)
            current_node_execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot enqueue replicated DDL query");
424 425 426
    }
    else
    {
427
        tryExecuteQuery(rewritten_query, context, current_node_execution_status, log);
428 429 430 431
    }
}


432 433 434 435 436 437 438
void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
{
    /// Both ZK and Poco use Unix epoch
    size_t current_time_seconds = Poco::Timestamp().epochTime();
    constexpr size_t zookeeper_time_resolution = 1000;

    // Too early to check
439
    if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_min_period_seconds)
440 441 442 443
        return;

    last_cleanup_time_seconds = current_time_seconds;

444 445
    LOG_DEBUG(log, "Cleaning queue");

446 447 448 449
    String data;
    zkutil::Stat stat;
    DDLLogEntry node;

450
    Strings node_names_fetched = node_names_to_check ? Strings{} : zookeeper->getChildren(queue_dir);
451 452 453 454 455 456
    const Strings & node_names = (node_names_to_check) ? *node_names_to_check : node_names_fetched;

    for (const String & node_name : node_names)
    {
        try
        {
457
            String node_path = queue_dir + "/" + node_name;
458 459 460
            if (!zookeeper->tryGet(node_path, data, &stat))
                continue;

461
            /// TODO: Add shared lock to avoid rare race counditions.
462 463 464 465 466 467 468 469 470 471

            size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
            if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds)
            {
                size_t lifetime_seconds = current_time_seconds - zookeeper_time_seconds;
                LOG_INFO(log, "Lifetime of node " << node_name << " (" << lifetime_seconds << " sec.) is expired, deleting it");
                zookeeper->removeRecursive(node_path);
                continue;
            }

472
            Strings finished_nodes = zookeeper->getChildren(node_path + "/finished");
473 474
            node.parse(data);

475
            if (finished_nodes.size() >= node.hosts.size())
476 477 478 479 480 481 482 483 484 485 486 487
            {
                LOG_INFO(log, "Node " << node_name << " had been executed by each host, deleting it");
                zookeeper->removeRecursive(node_path);
            }
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occured while checking and cleaning node " + node_name + " from queue");
        }
    }
}

488

489 490
/// Try to create unexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
491
{
492
    zkutil::Ops ops;
493
    auto acl = zookeeper->getDefaultACL();
494
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
495
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished", "", acl, zkutil::CreateMode::Persistent));
496

497 498 499 500
    int code = zookeeper->tryMulti(ops);
    if (code != ZOK && code != ZNODEEXISTS)
        throw zkutil::KeeperException(code);
}
501 502


503
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
504
{
505
    if (entry.hosts.empty())
506
        return {};
507

508
    String query_path_prefix = queue_dir + "/query-";
509
    zookeeper->createAncestors(query_path_prefix);
510

511 512
    String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
    createStatusDirs(node_path);
513 514

    return node_path;
515 516
}

517

518 519
void DDLWorker::run()
{
520
    setThreadName("DDLWorker");
521 522
    LOG_DEBUG(log, "Started DDLWorker thread");

523 524 525
    zookeeper = context.getZooKeeper();
    zookeeper->createAncestors(queue_dir + "/");

526 527 528 529
    while (!stop_flag)
    {
        try
        {
530
            processTasks();
531

532 533 534 535 536
            LOG_DEBUG(log, "Waiting watch");
            event_queue_updated->wait();

            if (stop_flag)
                break;
537 538

            cleanupQueue();
539
        }
540 541 542 543 544
        catch (zkutil::KeeperException &)
        {
            LOG_DEBUG(log, "Recovering ZooKeeper session");
            zookeeper = context.getZooKeeper();
        }
545 546 547
        catch (...)
        {
            tryLogCurrentException(log);
548
            throw;
549
        }
550 551 552
    }
}

553

554
class DDLQueryStatusInputSream : public IProfilingBlockInputStream
555
{
556
public:
557

558
    DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts)
559
    : node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE)
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
    {
        sample = Block{
            {std::make_shared<DataTypeString>(),    "host"},
            {std::make_shared<DataTypeUInt64>(),    "status"},
            {std::make_shared<DataTypeString>(),    "error"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_remaining"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_active"},
        };

        setTotalRowsApprox(num_hosts);
    }

    String getName() const override
    {
        return "DDLQueryStatusInputSream";
    }

    String getID() const override
    {
        return "DDLQueryStatusInputSream(" + node_path + ")";
    }

582 583
    static constexpr size_t timeout_seconds = 120;

584 585 586 587 588 589 590 591 592 593 594 595 596 597
    Block readImpl() override
    {
        Block res;
        if (num_hosts_finished >= total_rows_approx)
            return res;

        auto zookeeper = context.getZooKeeper();
        size_t try_number = 0;

        while(res.rows() == 0)
        {
            if (is_cancelled)
                return res;

598 599
            auto elapsed_seconds = watch.elapsedSeconds();
            if (elapsed_seconds > timeout_seconds)
600
                throw Exception("Watching query is executing too long (" + toString(std::round(elapsed_seconds)) + " sec.)", ErrorCodes::TIMEOUT_EXCEEDED);
601

602
            if (num_hosts_finished != 0 || try_number != 0)
603
                std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(20LU, try_number + 1)));
604

605
            /// TODO: add shared lock
606
            if (!zookeeper->exists(node_path))
607 608 609 610 611
            {
                throw Exception("Cannot provide query execution status. The query's node " + node_path
                                + " had been deleted by cleaner since it was finished (or its lifetime is expired)",
                                ErrorCodes::UNFINISHED);
            }
612

613
            Strings new_hosts = getNewAndUpdate(finished_hosts_set, getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
614 615 616 617
            ++try_number;
            if (new_hosts.empty())
                continue;

618 619
            Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");

620
            res = sample.cloneEmpty();
621
            for (const String & host : new_hosts)
622
            {
623 624 625 626 627 628 629 630 631 632
                ExecutionStatus status(1, "Cannot obtain error message");
                {
                    String status_data;
                    if (zookeeper->tryGet(node_path + "/finished/" + host, status_data))
                        status.deserializeText(status_data);
                }

                res.getByName("host").column->insert(host);
                res.getByName("status").column->insert(static_cast<UInt64>(status.code));
                res.getByName("error").column->insert(status.message);
633
                res.getByName("num_hosts_remaining").column->insert(total_rows_approx - (++num_hosts_finished));
634
                res.getByName("num_hosts_active").column->insert(cur_active_hosts.size());
635 636
            }
        }
637

638 639 640
        return res;
    }

641 642 643 644 645 646 647 648 649 650
    static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
    {
        Strings res;
        int code = zookeeper->tryGetChildren(node_path, res);
        if (code != ZOK && code != ZNONODE)
            throw zkutil::KeeperException(code, node_path);
        return res;
    }

    static Strings getNewAndUpdate(NameSet & prev, const Strings & cur_list)
651 652 653 654 655
    {
        Strings diff;
        for (const String & elem : cur_list)
        {
            if (!prev.count(elem))
656
            {
657
                diff.emplace_back(elem);
658 659
                prev.emplace(elem);
            }
660 661 662 663 664 665 666 667 668 669 670 671 672
        }

        return diff;
    }

    ~DDLQueryStatusInputSream() override = default;

    Block sample;

private:
    String node_path;
    Context & context;

673 674 675
    Stopwatch watch;

    NameSet finished_hosts_set;
676
    size_t num_hosts_finished = 0;
677
};
678 679


680
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
681
{
682 683 684 685 686 687
    const auto query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ptr.get());
    if (!query)
    {
        throw Exception("Distributed execution is not supported for such DDL queries",
                        ErrorCodes::NOT_IMPLEMENTED);
    }
688

689 690 691 692 693 694 695 696 697 698 699 700
    auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get());
    if (query_alter)
    {
        for (const auto & param : query_alter->parameters)
        {
            if (!isSupportedAlterType(param.type))
                throw Exception("Unsupported type of ALTER query", ErrorCodes::NOT_IMPLEMENTED);
        }
    }

    ClusterPtr cluster = context.getCluster(query->cluster);
    DDLWorker & ddl_worker = context.getDDLWorker();
701

702
    DDLLogEntry entry;
703
    entry.query = queryToString(query_ptr);
704 705
    entry.initiator = ddl_worker.getHostName();

706
    Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
707
    for (const auto & shard : shards)
708
    {
709
        for (const auto & addr : shard)
710
            entry.hosts.emplace_back(addr.toString());
711
    }
712

713
    String node_path = ddl_worker.enqueueQuery(entry);
714 715

    BlockIO io;
716 717 718 719 720 721
    if (node_path.empty())
        return io;

    auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, context, entry.hosts.size());
    io.in_sample = stream->sample.cloneEmpty();
    io.in = std::move(stream);
722 723 724 725
    return io;
}


726
}