DDLWorker.cpp 36.2 KB
Newer Older
B
bharatnc 已提交
1 2
#include <filesystem>

3
#include <Interpreters/DDLWorker.h>
A
Alexander Tokmakov 已提交
4
#include <Interpreters/DDLTask.h>
5
#include <Parsers/ASTAlterQuery.h>
6 7
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
8
#include <Parsers/ASTQueryWithOnCluster.h>
9
#include <Parsers/ASTQueryWithTableAndOutput.h>
10 11 12
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
13 14 15
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
16
#include <Storages/IStorage.h>
17
#include <Interpreters/executeQuery.h>
18
#include <Interpreters/Cluster.h>
19
#include <Interpreters/Context.h>
20
#include <Common/setThreadName.h>
21
#include <Common/randomSeed.h>
A
Alexey Milovidov 已提交
22 23 24
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/isLocalAddress.h>
25
#include <Storages/StorageReplicatedMergeTree.h>
26
#include <Poco/Timestamp.h>
A
Alexey Milovidov 已提交
27
#include <common/sleep.h>
28
#include <common/getFQDNOrHostName.h>
A
Alexander Tokmakov 已提交
29
#include <common/logger_useful.h>
30 31 32
#include <random>
#include <pcg_random.hpp>

B
bharatnc 已提交
33
namespace fs = std::filesystem;
34

A
Amos Bird 已提交
35 36 37 38
namespace CurrentMetrics
{
    extern const Metric MaxDDLEntryID;
}
39 40 41 42 43 44

namespace DB
{

namespace ErrorCodes
{
A
Alexey Milovidov 已提交
45
    extern const int NOT_IMPLEMENTED;
46
    extern const int LOGICAL_ERROR;
47
    extern const int TIMEOUT_EXCEEDED;
48
    extern const int UNFINISHED;
A
Alexander Tokmakov 已提交
49 50 51 52 53
    extern const int NOT_A_LEADER;
    extern const int KEEPER_EXCEPTION;
    extern const int CANNOT_ASSIGN_ALTER;
    extern const int CANNOT_ALLOCATE_MEMORY;
    extern const int MEMORY_LIMIT_EXCEEDED;
54
    extern const int INCORRECT_QUERY;
55 56 57
}


58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
namespace
{

/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
  *  and highlights your poor understanding of distributed systems.
  *
  * It's only correct if all the operations that are performed under lock
  *  are atomically checking that the lock still holds
  *  or if we ensure that these operations will be undone if lock is lost
  *  (due to ZooKeeper session loss) that's very difficult to achieve.
  *
  * It's Ok if every operation that we perform under lock is actually operation in ZooKeeper.
  *
  * In 1% of cases when you can correctly use Lock, the logic is complex enough, so you don't need this class.
  *
  * TLDR: Don't use this code.
  * We only have a few cases of it's usage and it will be removed.
  */
class ZooKeeperLock
77
{
78 79 80 81 82 83 84 85 86 87
public:
    /// lock_prefix - path where the ephemeral lock node will be created
    /// lock_name - the name of the ephemeral lock node
    ZooKeeperLock(
        const zkutil::ZooKeeperPtr & zookeeper_,
        const std::string & lock_prefix_,
        const std::string & lock_name_,
        const std::string & lock_message_ = "")
    :
        zookeeper(zookeeper_),
B
bharatnc 已提交
88
        lock_path(fs::path(lock_prefix_) / lock_name_),
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        lock_message(lock_message_),
        log(&Poco::Logger::get("zkutil::Lock"))
    {
        zookeeper->createIfNotExists(lock_prefix_, "");
    }

    ~ZooKeeperLock()
    {
        try
        {
            unlock();
        }
        catch (...)
        {
            DB::tryLogCurrentException(__PRETTY_FUNCTION__);
        }
    }

    void unlock()
    {
        Coordination::Stat stat;
        std::string dummy;
        bool result = zookeeper->tryGet(lock_path, dummy, &stat);

        if (result && stat.ephemeralOwner == zookeeper->getClientID())
            zookeeper->remove(lock_path, -1);
        else
            LOG_WARNING(log, "Lock is lost. It is normal if session was expired. Path: {}/{}", lock_path, lock_message);
    }

    bool tryLock()
    {
        std::string dummy;
        Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy);

        if (code == Coordination::Error::ZNODEEXISTS)
        {
            return false;
        }
        else if (code == Coordination::Error::ZOK)
        {
            return true;
        }
        else
        {
            throw Coordination::Exception(code);
        }
    }

private:
    zkutil::ZooKeeperPtr zookeeper;

    std::string lock_path;
    std::string lock_message;
    Poco::Logger * log;

};

std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
    const zkutil::ZooKeeperPtr & zookeeper, const String & lock_prefix, const String & lock_name, const String & lock_message)
149
{
150 151 152
    return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message);
}

153 154 155
}


A
Alexander Tokmakov 已提交
156
DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, const Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
157
                     const String & logger_name)
158
    : context(context_)
159
    , log(&Poco::Logger::get(logger_name))
160
    , pool_size(pool_size_)
161
{
A
Amos Bird 已提交
162
    CurrentMetrics::set(CurrentMetrics::MaxDDLEntryID, 0);
163

164 165 166 167
    if (1 < pool_size)
    {
        LOG_WARNING(log, "DDLWorker is configured to use multiple threads. "
                         "It's not recommended because queries can be reordered. Also it may cause some unknown issues to appear.");
168
        worker_pool = std::make_unique<ThreadPool>(pool_size);
169
    }
170

171 172 173
    queue_dir = zk_root_dir;
    if (queue_dir.back() == '/')
        queue_dir.resize(queue_dir.size() - 1);
174

175 176
    if (config)
    {
177 178
        task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
        cleanup_delay_period = config->getUInt64(prefix + ".cleanup_delay_period", static_cast<UInt64>(cleanup_delay_period));
P
proller 已提交
179
        max_tasks_in_queue = std::max<UInt64>(1, config->getUInt64(prefix + ".max_tasks_in_queue", max_tasks_in_queue));
180 181 182 183 184

        if (config->has(prefix + ".profile"))
            context.setSetting("profile", config->getString(prefix + ".profile"));
    }

185
    if (context.getSettingsRef().readonly)
186
    {
A
Alexey Milovidov 已提交
187
        LOG_WARNING(log, "Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries Set appropriate system_profile or distributed_ddl.profile to fix this.");
188 189
    }

190 191
    host_fqdn = getFQDNOrHostName();
    host_fqdn_id = Cluster::Address::toString(host_fqdn, context.getTCPPort());
A
fixes  
Alexander Tokmakov 已提交
192
}
193

A
fixes  
Alexander Tokmakov 已提交
194 195
void DDLWorker::startup()
{
196 197
    main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this);
    cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this);
198 199
}

A
Alexander Tokmakov 已提交
200
void DDLWorker::shutdown()
201 202
{
    stop_flag = true;
203 204
    queue_updated_event->set();
    cleanup_event->set();
A
Alexander Tokmakov 已提交
205

A
fixes  
Alexander Tokmakov 已提交
206 207 208 209
    if (main_thread.joinable())
        main_thread.join();
    if (cleanup_thread.joinable())
        cleanup_thread.join();
A
Alexander Tokmakov 已提交
210 211

    worker_pool.reset();
212 213
}

A
fix  
Alexander Tokmakov 已提交
214 215 216 217 218
DDLWorker::~DDLWorker()
{
    shutdown();
}

219

A
Alexander Tokmakov 已提交
220
ZooKeeperPtr DDLWorker::tryGetZooKeeper() const
221 222 223 224 225
{
    std::lock_guard lock(zookeeper_mutex);
    return current_zookeeper;
}

A
Alexander Tokmakov 已提交
226
ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
227 228 229 230 231 232 233 234 235
{
    std::lock_guard lock(zookeeper_mutex);

    if (!current_zookeeper || current_zookeeper->expired())
        current_zookeeper = context.getZooKeeper();

    return current_zookeeper;
}

236 237

DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
238
{
239
    String node_data;
B
bharatnc 已提交
240
    String entry_path = fs::path(queue_dir) / entry_name;
241

242
    auto task = std::make_unique<DDLTask>(entry_name, entry_path);
A
Alexander Tokmakov 已提交
243

244
    if (!zookeeper->tryGet(entry_path, node_data))
245 246
    {
        /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
247
        out_reason = "The task was deleted";
248
        return {};
249
    }
250

A
Alexander Tokmakov 已提交
251 252 253 254
    auto write_error_status = [&](const String & host_id, const String & error_message, const String & reason)
    {
        LOG_ERROR(log, "Cannot parse DDL task {}: {}. Will try to send error status: {}", entry_name, reason, error_message);
        createStatusDirs(entry_path, zookeeper);
255
        zookeeper->tryCreate(fs::path(entry_path) / "finished" / host_id, error_message, zkutil::CreateMode::Persistent);
A
Alexander Tokmakov 已提交
256 257 258 259 260 261 262 263
    };

    try
    {
        /// Stage 1: parse entry
        task->entry.parse(node_data);
    }
    catch (...)
264 265
    {
        /// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
M
maiha 已提交
266
        /// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
L
liangqian 已提交
267
        /// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
268
        out_reason = "Incorrect task format";
A
Alexander Tokmakov 已提交
269
        write_error_status(host_fqdn_id, ExecutionStatus::fromCurrentException().serializeText(), out_reason);
270
        return {};
271 272
    }

A
Alexander Tokmakov 已提交
273
    /// Stage 2: resolve host_id and check if we should execute query or not
274
    if (!task->findCurrentHostID(context, log))
275
    {
276
        out_reason = "There is no a local address in host list";
277 278
        return {};
    }
279

A
Alexander Tokmakov 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292
    try
    {
        /// Stage 3.1: parse query
        task->parseQueryFromEntry(context);
        /// Stage 3.2: check cluster and find the host in cluster
        task->setClusterInfo(context, log);
    }
    catch (...)
    {
        out_reason = "Cannot parse query or obtain cluster info";
        write_error_status(task->host_id_str, ExecutionStatus::fromCurrentException().serializeText(), out_reason);
        return {};
    }
293

294 295 296 297 298 299
    if (zookeeper->exists(task->getFinishedNodePath()))
    {
        out_reason = "Task has been already processed";
        return {};
    }

A
Alexander Tokmakov 已提交
300
    /// Now task is ready for execution
301
    return task;
302
}
303 304


305 306 307 308 309 310
static void filterAndSortQueueNodes(Strings & all_nodes)
{
    all_nodes.erase(std::remove_if(all_nodes.begin(), all_nodes.end(), [] (const String & s) { return !startsWith(s, "query-"); }), all_nodes.end());
    std::sort(all_nodes.begin(), all_nodes.end());
}

311
void DDLWorker::scheduleTasks()
312
{
313
    LOG_DEBUG(log, "Scheduling tasks");
314
    auto zookeeper = tryGetZooKeeper();
315

A
Alexander Tokmakov 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328
    for (auto & task : current_tasks)
    {
        /// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
        /// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status.
        bool status_written = task->ops.empty();
        bool task_still_exists = zookeeper->exists(task->entry_path);
        if (task->was_executed && !status_written && task_still_exists)
        {
            assert(!zookeeper->exists(task->getFinishedNodePath()));
            processTask(*task);
        }
    }

329
    Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
330
    filterAndSortQueueNodes(queue_nodes);
331
    if (queue_nodes.empty())
A
Alexander Tokmakov 已提交
332 333
    {
        LOG_TRACE(log, "No tasks to schedule");
334
        return;
A
Alexander Tokmakov 已提交
335
    }
336

337
    bool server_startup = current_tasks.empty();
A
Alexander Tokmakov 已提交
338
    auto begin_node = queue_nodes.begin();
339

A
Alexander Tokmakov 已提交
340 341 342 343 344 345 346
    if (!server_startup)
    {
        /// We will recheck status of last executed tasks. It's useful if main thread was just restarted.
        auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end());
        begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_task->entry_name);
        current_tasks.clear();
    }
347

A
fix  
Alexander Tokmakov 已提交
348 349
    assert(current_tasks.empty());

A
Alexander Tokmakov 已提交
350
    for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
351 352
    {
        String entry_name = *it;
A
Alexander Tokmakov 已提交
353
        LOG_TRACE(log, "Checking task {}", entry_name);
354

355 356 357
        String reason;
        auto task = initAndCheckTask(entry_name, reason, zookeeper);
        if (!task)
358
        {
359
            LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason);
A
fix  
Alexander Tokmakov 已提交
360
            updateMaxDDLEntryID(entry_name);
361
            continue;
362 363
        }

364
        auto & saved_task = saveTask(std::move(task));
365

366
        if (worker_pool)
367
        {
368
            worker_pool->scheduleOrThrowOnError([this, &saved_task]()
A
Alexander Tokmakov 已提交
369
            {
370 371 372
                setThreadName("DDLWorkerExec");
                processTask(saved_task);
            });
373
        }
374 375
        else
        {
376
            processTask(saved_task);
377
        }
378 379
    }
}
380

381 382
DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
{
A
Alexander Tokmakov 已提交
383
    current_tasks.remove_if([](const DDLTaskPtr & t) { return t->completely_processed.load(); });
A
fix  
Alexander Tokmakov 已提交
384
    assert(current_tasks.size() <= pool_size);
385 386
    current_tasks.emplace_back(std::move(task));
    return *current_tasks.back();
387 388
}

A
Alexander Tokmakov 已提交
389
bool DDLWorker::tryExecuteQuery(const String & query, DDLTaskBase & task)
390
{
391
    /// Add special comment at the start of query to easily identify DDL-produced queries in query_log
A
alexey-milovidov 已提交
392
    String query_prefix = "/* ddl_entry=" + task.entry_name + " */ ";
393 394 395 396 397
    String query_to_execute = query_prefix + query;

    ReadBufferFromString istr(query_to_execute);
    String dummy_string;
    WriteBufferFromString ostr(dummy_string);
A
Alexander Tokmakov 已提交
398
    std::optional<CurrentThread::QueryScope> query_scope;
399

400 401
    try
    {
402
        auto query_context = task.makeQueryContext(context);
403 404 405
        if (!task.is_initial_query)
            query_scope.emplace(*query_context);
        executeQuery(istr, ostr, !task.is_initial_query, *query_context, {});
A
fix  
Alexander Tokmakov 已提交
406 407 408 409 410 411

        if (auto txn = query_context->getMetadataTransaction())
        {
            if (txn->state == MetadataTransaction::CREATED)
                txn->commit();
        }
412
    }
A
Alexander Tokmakov 已提交
413 414
    catch (const DB::Exception & e)
    {
415 416 417
        if (task.is_initial_query)
            throw;

A
Alexander Tokmakov 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
        task.execution_status = ExecutionStatus::fromCurrentException();
        tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");

        /// We use return value of tryExecuteQuery(...) in tryExecuteQueryOnLeaderReplica(...) to determine
        /// if replica has stopped being leader and we should retry query.
        /// However, for the majority of exceptions there is no sense to retry, because most likely we will just
        /// get the same exception again. So we return false only for several special exception codes,
        /// and consider query as executed with status "failed" and return true in other cases.
        bool no_sense_to_retry = e.code() != ErrorCodes::KEEPER_EXCEPTION &&
                                 e.code() != ErrorCodes::NOT_A_LEADER &&
                                 e.code() != ErrorCodes::CANNOT_ASSIGN_ALTER &&
                                 e.code() != ErrorCodes::CANNOT_ALLOCATE_MEMORY &&
                                 e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED;
        return no_sense_to_retry;
    }
433 434
    catch (...)
    {
435 436 437
        if (task.is_initial_query)
            throw;

A
Alexander Tokmakov 已提交
438
        task.execution_status = ExecutionStatus::fromCurrentException();
439
        tryLogCurrentException(log, "Query " + query + " wasn't finished successfully");
440

A
Alexander Tokmakov 已提交
441 442
        /// We don't know what exactly happened, but maybe it's Poco::NetException or std::bad_alloc,
        /// so we consider unknown exception as retryable error.
443 444 445
        return false;
    }

A
Alexander Tokmakov 已提交
446
    task.execution_status = ExecutionStatus(0);
A
Alexey Milovidov 已提交
447
    LOG_DEBUG(log, "Executed query: {}", query);
448 449 450 451

    return true;
}

A
fix  
Alexander Tokmakov 已提交
452
void DDLWorker::updateMaxDDLEntryID(const String & entry_name)
453
{
A
fix  
Alexander Tokmakov 已提交
454
    DB::ReadBufferFromString in(entry_name);
455 456 457 458 459 460 461 462 463 464 465 466 467 468
    DB::assertString("query-", in);
    UInt64 id;
    readText(id, in);
    auto prev_id = max_id.load(std::memory_order_relaxed);
    while (prev_id < id)
    {
        if (max_id.compare_exchange_weak(prev_id, id))
        {
            CurrentMetrics::set(CurrentMetrics::MaxDDLEntryID, id);
            break;
        }
    }
}

469
void DDLWorker::processTask(DDLTaskBase & task)
470
{
471 472
    auto zookeeper = tryGetZooKeeper();

A
Alexey Milovidov 已提交
473
    LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
474

475 476
    String active_node_path = task.getActiveNodePath();
    String finished_node_path = task.getFinishedNodePath();
477

478
    String dummy;
A
Alexander Tokmakov 已提交
479 480
    zookeeper->createAncestors(active_node_path);
    auto active_node = zkutil::EphemeralNodeHolder::create(active_node_path, *zookeeper, "");
481

A
Alexander Tokmakov 已提交
482
    if (!task.was_executed)
483
    {
A
Alexander Tokmakov 已提交
484 485 486 487 488 489
        /// If table and database engine supports it, they will execute task.ops by their own in a single transaction
        /// with other zk operations (such as appending something to ReplicatedMergeTree log, or
        /// updating metadata in Replicated database), so we make create request for finished_node_path with status "0",
        /// which means that query executed successfully.
        task.ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1));
        task.ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, "0", zkutil::CreateMode::Persistent));
490

491 492
        try
        {
493
            String rewritten_query = queryToString(task.query);
A
Alexey Milovidov 已提交
494
            LOG_DEBUG(log, "Executing query: {}", rewritten_query);
495

496 497
            StoragePtr storage;
            if (auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(task.query.get()); query_with_table)
498
            {
A
Alexander Tokmakov 已提交
499 500 501
                if (!query_with_table->table.empty())
                {
                    /// It's not CREATE DATABASE
A
Alexander Tokmakov 已提交
502
                    auto table_id = context.tryResolveStorageID(*query_with_table, Context::ResolveOrdinary);
503 504
                    DatabasePtr database;
                    std::tie(database, storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable(table_id, context);
A
fix  
Alexander Tokmakov 已提交
505
                    if (database && database->getEngineName() == "Replicated" && !typeid_cast<const DatabaseReplicatedTask *>(&task))
506
                        throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER queries are not allowed for Replicated databases");
A
Alexander Tokmakov 已提交
507
                }
508

509
                task.execute_on_leader = storage && taskShouldBeExecutedOnLeader(task.query, storage) && !task.is_circular_replicated;
510
            }
511 512

            if (task.execute_on_leader)
A
fix  
Alexander Tokmakov 已提交
513
            {
A
fix  
Alexander Tokmakov 已提交
514
                tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper);
A
fix  
Alexander Tokmakov 已提交
515
            }
516
            else
A
fix  
Alexander Tokmakov 已提交
517 518
            {
                storage.reset();
A
Alexander Tokmakov 已提交
519
                tryExecuteQuery(rewritten_query, task);
A
fix  
Alexander Tokmakov 已提交
520
            }
521
        }
522
        catch (const Coordination::Exception &)
523 524 525 526 527
        {
            throw;
        }
        catch (...)
        {
528 529
            if (task.is_initial_query)
                throw;
A
Alexander Tokmakov 已提交
530 531
            tryLogCurrentException(log, "An error occurred before execution of DDL task: ");
            task.execution_status = ExecutionStatus::fromCurrentException("An error occurred before execution");
532 533
        }

A
Alexander Tokmakov 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
        if (task.execution_status.code != 0)
        {
            bool status_written_by_table_or_db = task.ops.empty();
            if (status_written_by_table_or_db)
            {
                throw Exception(ErrorCodes::UNFINISHED, "Unexpected error: {}", task.execution_status.serializeText());
            }
            else
            {
                /// task.ops where not executed by table or database engine, se DDLWorker is responsible for
                /// writing query execution status into ZooKeeper.
                task.ops.emplace_back(zkutil::makeSetRequest(finished_node_path, task.execution_status.serializeText(), -1));
            }
        }

A
Alexey Milovidov 已提交
549
        /// We need to distinguish ZK errors occurred before and after query executing
550
        task.was_executed = true;
551 552
    }

A
fix  
Alexander Tokmakov 已提交
553
    updateMaxDDLEntryID(task.entry_name);
A
Amos Bird 已提交
554

555
    /// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
A
Alexander Tokmakov 已提交
556
    /// If ZooKeeper connection is lost here, we will try again to write query status.
557

A
Alexander Tokmakov 已提交
558 559 560 561 562 563 564
    bool status_written = task.ops.empty();
    if (!status_written)
    {
        zookeeper->multi(task.ops);
        active_node->reset();
        task.ops.clear();
    }
A
fix  
Alexander Tokmakov 已提交
565 566

    task.completely_processed = true;
567
}
568 569


A
Alexey Milovidov 已提交
570
bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, const StoragePtr storage)
571
{
572
    /// Pure DROP queries have to be executed on each node separately
A
Alexey Milovidov 已提交
573
    if (auto * query = ast_ddl->as<ASTDropQuery>(); query && query->kind != ASTDropQuery::Kind::Truncate)
574
        return false;
575

576 577
    if (!ast_ddl->as<ASTAlterQuery>() && !ast_ddl->as<ASTOptimizeQuery>() && !ast_ddl->as<ASTDropQuery>())
        return false;
578

579 580 581 582 583
    if (auto * alter = ast_ddl->as<ASTAlterQuery>())
    {
        // Setting alters should be executed on all replicas
        if (alter->isSettingsAlter())
            return false;
A
fix  
Alexander Tokmakov 已提交
584 585 586

        if (alter->isFreezeAlter())
            return false;
587 588
    }

589 590
    return storage->supportsReplication();
}
591

592
bool DDLWorker::tryExecuteQueryOnLeaderReplica(
593
    DDLTaskBase & task,
594 595
    StoragePtr storage,
    const String & rewritten_query,
596
    const String & /*node_path*/,
597 598 599 600 601 602
    const ZooKeeperPtr & zookeeper)
{
    StorageReplicatedMergeTree * replicated_storage = dynamic_cast<StorageReplicatedMergeTree *>(storage.get());

    /// If we will develop new replicated storage
    if (!replicated_storage)
A
Alexander Tokmakov 已提交
603
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage type '{}' is not supported by distributed DDL", storage->getName());
604

605
    String shard_path = task.getShardNodePath();
B
bharatnc 已提交
606 607 608
    String is_executed_path = fs::path(shard_path) / "executed";
    String tries_to_execute_path = fs::path(shard_path) / "tries_to_execute";
    zookeeper->createAncestors(fs::path(shard_path) / ""); /* appends "/" at the end of shard_path */
609

A
Alexander Tokmakov 已提交
610 611 612 613
    /// Leader replica creates is_executed_path node on successful query execution.
    /// We will remove create_shard_flag from zk operations list, if current replica is just waiting for leader to execute the query.
    auto create_shard_flag = zkutil::makeCreateRequest(is_executed_path, task.host_id_str, zkutil::CreateMode::Persistent);

A
alesapin 已提交
614 615
    /// Node exists, or we will create or we will get an exception
    zookeeper->tryCreate(tries_to_execute_path, "0", zkutil::CreateMode::Persistent);
616

A
alesapin 已提交
617
    static constexpr int MAX_TRIES_TO_EXECUTE = 3;
618
    static constexpr int MAX_EXECUTION_TIMEOUT_SEC = 3600;
A
alesapin 已提交
619 620 621 622

    String executed_by;

    zkutil::EventPtr event = std::make_shared<Poco::Event>();
623 624 625
    /// We must use exists request instead of get, because zookeeper will not setup event
    /// for non existing node after get request
    if (zookeeper->exists(is_executed_path, nullptr, event))
A
alesapin 已提交
626
    {
627
        LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
A
alesapin 已提交
628 629
        return true;
    }
630

631
    pcg64 rng(randomSeed());
632

633
    auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
A
alesapin 已提交
634

635 636
    Stopwatch stopwatch;

A
Alexander Tokmakov 已提交
637 638 639
    bool executed_by_us = false;
    bool executed_by_other_leader = false;

640 641 642 643
    /// Defensive programming. One hour is more than enough to execute almost all DDL queries.
    /// If it will be very long query like ALTER DELETE for a huge table it's still will be executed,
    /// but DDL worker can continue processing other queries.
    while (stopwatch.elapsedSeconds() <= MAX_EXECUTION_TIMEOUT_SEC)
644 645
    {
        StorageReplicatedMergeTree::Status status;
646 647
        // Has to get with zk fields to get active replicas field
        replicated_storage->getStatus(status, true);
648

649 650 651
        // Should return as soon as possible if the table is dropped.
        bool replica_dropped = replicated_storage->is_dropped;
        bool all_replicas_likely_detached = status.active_replicas == 0 && !DatabaseCatalog::instance().isTableExist(replicated_storage->getStorageID(), context);
Y
yiguolei 已提交
652
        if (replica_dropped || all_replicas_likely_detached)
Y
yiguolei 已提交
653
        {
Y
yiguolei 已提交
654 655
            LOG_WARNING(log, ", task {} will not be executed.", task.entry_name);
            task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, table is dropped or detached permanently");
656 657
            return false;
        }
658

659 660 661
        if (task.is_initial_query && !status.is_leader)
            throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot execute initial query on non-leader replica");

A
alesapin 已提交
662
        /// Any replica which is leader tries to take lock
663 664
        if (status.is_leader && lock->tryLock())
        {
A
alesapin 已提交
665
            /// In replicated merge tree we can have multiple leaders. So we can
666 667
            /// be "leader" and took lock, but another "leader" replica may have
            /// already executed this task.
A
alesapin 已提交
668 669 670
            if (zookeeper->tryGet(is_executed_path, executed_by))
            {
                LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, executed_by);
A
Alexander Tokmakov 已提交
671
                executed_by_other_leader = true;
A
alesapin 已提交
672 673 674
                break;
            }

675
            /// Checking and incrementing counter exclusively.
A
alesapin 已提交
676 677
            size_t counter = parse<int>(zookeeper->get(tries_to_execute_path));
            if (counter > MAX_TRIES_TO_EXECUTE)
678
                break;
A
alesapin 已提交
679

A
alesapin 已提交
680
            zookeeper->set(tries_to_execute_path, toString(counter + 1));
681

A
Alexander Tokmakov 已提交
682 683 684
            task.ops.push_back(create_shard_flag);
            SCOPE_EXIT({ if (!executed_by_us && !task.ops.empty()) task.ops.pop_back(); });

685 686
            /// If the leader will unexpectedly changed this method will return false
            /// and on the next iteration new leader will take lock
A
Alexander Tokmakov 已提交
687
            if (tryExecuteQuery(rewritten_query, task))
688
            {
A
Alexander Tokmakov 已提交
689
                executed_by_us = true;
690
                break;
691
            }
A
alesapin 已提交
692 693

            lock->unlock();
694
        }
695

696
        /// Waiting for someone who will execute query and change is_executed_path node
A
alesapin 已提交
697 698
        if (event->tryWait(std::uniform_int_distribution<int>(0, 1000)(rng)))
        {
A
alesapin 已提交
699
            LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
A
Alexander Tokmakov 已提交
700
            executed_by_other_leader = true;
A
alesapin 已提交
701 702
            break;
        }
703
        else
A
alesapin 已提交
704
        {
705 706 707 708 709 710 711 712 713 714 715 716 717
            String tries_count;
            zookeeper->tryGet(tries_to_execute_path, tries_count);
            if (parse<int>(tries_count) > MAX_TRIES_TO_EXECUTE)
            {
                /// Nobody will try to execute query again
                LOG_WARNING(log, "Maximum retries count for task {} exceeded, cannot execute replicated DDL query", task.entry_name);
                break;
            }
            else
            {
                /// Will try to wait or execute
                LOG_TRACE(log, "Task {} still not executed, will try to wait for it or execute ourselves, tries count {}", task.entry_name, tries_count);
            }
A
alesapin 已提交
718
        }
719
    }
720

A
Alexander Tokmakov 已提交
721 722
    assert(!(executed_by_us && executed_by_other_leader));

723
    /// Not executed by leader so was not executed at all
A
Alexander Tokmakov 已提交
724
    if (!executed_by_us && !executed_by_other_leader)
725
    {
726 727 728 729
        /// If we failed with timeout
        if (stopwatch.elapsedSeconds() >= MAX_EXECUTION_TIMEOUT_SEC)
        {
            LOG_WARNING(log, "Task {} was not executed by anyone, maximum timeout {} seconds exceeded", task.entry_name, MAX_EXECUTION_TIMEOUT_SEC);
A
alesapin 已提交
730
            task.execution_status = ExecutionStatus(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot execute replicated DDL query, timeout exceeded");
731 732 733 734 735 736
        }
        else /// If we exceeded amount of tries
        {
            LOG_WARNING(log, "Task {} was not executed by anyone, maximum number of retries exceeded", task.entry_name);
            task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, maximum retires exceeded");
        }
737
        return false;
738
    }
A
alesapin 已提交
739

A
Alexander Tokmakov 已提交
740 741 742 743 744
    if (executed_by_us)
        LOG_DEBUG(log, "Task {} executed by current replica", task.entry_name);
    else // if (executed_by_other_leader)
        LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));

745
    return true;
746 747 748
}


749
void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
750
{
A
Alexey Milovidov 已提交
751
    LOG_DEBUG(log, "Cleaning queue");
752

753 754 755 756 757
    Strings queue_nodes = zookeeper->getChildren(queue_dir);
    filterAndSortQueueNodes(queue_nodes);

    size_t num_outdated_nodes = (queue_nodes.size() > max_tasks_in_queue) ? queue_nodes.size() - max_tasks_in_queue : 0;
    auto first_non_outdated_node = queue_nodes.begin() + num_outdated_nodes;
758

759
    for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
760
    {
761 762 763
        if (stop_flag)
            return;

764
        String node_name = *it;
B
bharatnc 已提交
765 766
        String node_path = fs::path(queue_dir) / node_name;
        String lock_path = fs::path(node_path) / "lock";
767

768
        Coordination::Stat stat;
769
        String dummy;
770

771 772
        try
        {
773 774 775 776
            /// Already deleted
            if (!zookeeper->exists(node_path, &stat))
                continue;

P
Pradeep Chhetri 已提交
777
            /// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
778
            constexpr UInt64 zookeeper_time_resolution = 1000;
779
            Int64 zookeeper_time_seconds = stat.ctime / zookeeper_time_resolution;
780 781 782 783 784 785 786 787 788
            bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds;

            /// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
            bool node_is_outside_max_window = it < first_non_outdated_node;

            if (!node_lifetime_is_expired && !node_is_outside_max_window)
                continue;

            /// Skip if there are active nodes (it is weak guard)
B
bharatnc 已提交
789
            if (zookeeper->exists(fs::path(node_path) / "active", &stat) && stat.numChildren > 0)
790
            {
A
Alexey Milovidov 已提交
791
                LOG_INFO(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name);
792 793 794
                continue;
            }

795 796
            /// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
            /// But the lock will be required to implement system.distributed_ddl_queue table
797 798
            auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
            if (!lock->tryLock())
799
            {
A
Alexey Milovidov 已提交
800
                LOG_INFO(log, "Task {} should be deleted, but it is locked. Skipping it.", node_name);
801
                continue;
802
            }
803

804
            if (node_lifetime_is_expired)
A
Alexey Milovidov 已提交
805
                LOG_INFO(log, "Lifetime of task {} is expired, deleting it", node_name);
806
            else if (node_is_outside_max_window)
A
Alexey Milovidov 已提交
807
                LOG_INFO(log, "Task {} is outdated, deleting it", node_name);
808 809

            /// Deleting
810
            {
A
Alexey Milovidov 已提交
811 812
                Strings children = zookeeper->getChildren(node_path);
                for (const String & child : children)
813
                {
814
                    if (child != "lock")
B
bharatnc 已提交
815
                        zookeeper->tryRemoveRecursive(fs::path(node_path) / child);
816
                }
817

818
                /// Remove the lock node and its parent atomically
819
                Coordination::Requests ops;
820 821
                ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
                ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
822
                zookeeper->multi(ops);
823 824 825 826
            }
        }
        catch (...)
        {
A
Alexey Milovidov 已提交
827
            LOG_INFO(log, "An error occurred while checking and cleaning task {} from queue: {}", node_name, getCurrentExceptionMessage(false));
828 829 830 831
        }
    }
}

832

A
alexey-milovidov 已提交
833
/// Try to create nonexisting "status" dirs for a node
834
void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper)
835
{
836
    Coordination::Requests ops;
837
    {
838
        Coordination::CreateRequest request;
B
bharatnc 已提交
839
        request.path = fs::path(node_path) / "active";
840
        ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
841 842
    }
    {
843
        Coordination::CreateRequest request;
B
bharatnc 已提交
844
        request.path = fs::path(node_path) / "finished";
845
        ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
846
    }
847
    Coordination::Responses responses;
848 849 850
    Coordination::Error code = zookeeper->tryMulti(ops, responses);
    if (code != Coordination::Error::ZOK
        && code != Coordination::Error::ZNODEEXISTS)
851
        throw Coordination::Exception(code);
852
}
853 854


855
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
856
{
857
    if (entry.hosts.empty())
858
        throw Exception("Empty host list in a distributed DDL task", ErrorCodes::LOGICAL_ERROR);
859

860 861
    auto zookeeper = getAndSetZooKeeper();

B
bharatnc 已提交
862
    String query_path_prefix = fs::path(queue_dir) / "query-";
863
    zookeeper->createAncestors(query_path_prefix);
864

865
    String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
866

867 868 869
    /// We cannot create status dirs in a single transaction with previous request,
    /// because we don't know node_path until previous request is executed.
    /// Se we try to create status dirs here or later when we will execute entry.
870 871
    try
    {
872
        createStatusDirs(node_path, zookeeper);
873 874 875
    }
    catch (...)
    {
A
Alexey Milovidov 已提交
876
        LOG_INFO(log, "An error occurred while creating auxiliary ZooKeeper directories in {} . They will be created later. Error : {}", node_path, getCurrentExceptionMessage(true));
877
    }
878 879

    return node_path;
880 881
}

882

883
void DDLWorker::initializeMainThread()
884
{
A
fix  
Alexander Tokmakov 已提交
885 886 887
    assert(!initialized);
    assert(max_id == 0);
    assert(current_tasks.empty());
888
    setThreadName("DDLWorker");
A
Alexey Milovidov 已提交
889
    LOG_DEBUG(log, "Started DDLWorker thread");
890

A
fix  
Alexander Tokmakov 已提交
891
    while (!stop_flag)
892
    {
893 894
        try
        {
A
Alexey Milovidov 已提交
895
            auto zookeeper = getAndSetZooKeeper();
B
bharatnc 已提交
896
            zookeeper->createAncestors(fs::path(queue_dir) / "");
A
Alexey Milovidov 已提交
897
            initialized = true;
A
fix  
Alexander Tokmakov 已提交
898
            return;
A
Alexey Milovidov 已提交
899 900 901 902
        }
        catch (const Coordination::Exception & e)
        {
            if (!Coordination::isHardwareError(e.code))
903 904
            {
                /// A logical error.
A
Alexander Tokmakov 已提交
905 906
                LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true));
                assert(false);  /// Catch such failures in tests with debug build
907
            }
908

A
Alexey Milovidov 已提交
909
            tryLogCurrentException(__PRETTY_FUNCTION__);
910 911 912
        }
        catch (...)
        {
A
Alexander Tokmakov 已提交
913
            tryLogCurrentException(log, "Cannot initialize DDL queue.");
914
        }
A
fix  
Alexander Tokmakov 已提交
915 916 917

        /// Avoid busy loop when ZooKeeper is not available.
        sleepForSeconds(5);
918
    }
919 920 921 922
}

void DDLWorker::runMainThread()
{
A
fix  
Alexander Tokmakov 已提交
923
    auto reset_state = [&]()
924 925 926 927
    {
        initialized = false;
        /// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
        /// We create new thread pool to forget previous exceptions.
A
fix  
Alexander Tokmakov 已提交
928
        if (1 < pool_size)
929 930 931 932 933 934
            worker_pool = std::make_unique<ThreadPool>(pool_size);
        /// Clear other in-memory state, like server just started.
        current_tasks.clear();
        max_id = 0;
    };

935 936
    setThreadName("DDLWorker");
    LOG_DEBUG(log, "Starting DDLWorker thread");
937

938 939 940 941
    while (!stop_flag)
    {
        try
        {
942 943 944 945 946 947
            /// Reinitialize DDLWorker state (including ZooKeeper connection) if required
            if (!initialized)
            {
                initializeMainThread();
                LOG_DEBUG(log, "Initialized DDLWorker thread");
            }
C
fixes  
CurtizJ 已提交
948

949
            cleanup_event->set();
950
            scheduleTasks();
951

952
            LOG_DEBUG(log, "Waiting for queue updates");
953
            queue_updated_event->wait();
954
        }
A
Alexey Milovidov 已提交
955
        catch (const Coordination::Exception & e)
956
        {
957
            if (Coordination::isHardwareError(e.code))
958
            {
959
                initialized = false;
A
fix  
Alexander Tokmakov 已提交
960
                LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
961
            }
962
            else if (e.code == Coordination::Error::ZNONODE)
963
            {
964 965
                // TODO add comment: when it happens and why it's expected?
                // maybe because cleanup thread may remove nodes inside queue entry which are currently processed
A
Alexey Milovidov 已提交
966
                LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
967
            }
968 969
            else
            {
A
fix  
Alexander Tokmakov 已提交
970
                LOG_ERROR(log, "Unexpected ZooKeeper error, will try to restart main thread: {}", getCurrentExceptionMessage(true));
A
Alexander Tokmakov 已提交
971
                reset_state();
972
            }
A
fix  
Alexander Tokmakov 已提交
973
            sleepForSeconds(1);
974
        }
975 976
        catch (...)
        {
977
            tryLogCurrentException(log, "Unexpected error, will try to restart main thread:");
A
Alexander Tokmakov 已提交
978
            reset_state();
A
fix  
Alexander Tokmakov 已提交
979
            sleepForSeconds(5);
980
        }
981 982 983
    }
}

984

985 986 987
void DDLWorker::runCleanupThread()
{
    setThreadName("DDLWorkerClnr");
A
Alexey Milovidov 已提交
988
    LOG_DEBUG(log, "Started DDLWorker cleanup thread");
989 990 991 992 993 994 995 996 997 998 999 1000 1001

    Int64 last_cleanup_time_seconds = 0;
    while (!stop_flag)
    {
        try
        {
            cleanup_event->wait();
            if (stop_flag)
                break;

            Int64 current_time_seconds = Poco::Timestamp().epochTime();
            if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
            {
A
Alexey Milovidov 已提交
1002
                LOG_TRACE(log, "Too early to clean queue, will do it later.");
1003 1004 1005
                continue;
            }

1006
            /// ZooKeeper connection is recovered by main thread. We will wait for it on cleanup_event.
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
            auto zookeeper = tryGetZooKeeper();
            if (zookeeper->expired())
                continue;

            cleanupQueue(current_time_seconds, zookeeper);
            last_cleanup_time_seconds = current_time_seconds;
        }
        catch (...)
        {
            tryLogCurrentException(log, __PRETTY_FUNCTION__);
        }
    }
}

1021
}