ClusterCopier.cpp 79.0 KB
Newer Older
1
#include "ClusterCopier.h"
2

N
Nikita Mikhaylov 已提交
3 4
#include "Internals.h"

5
#include <Common/ZooKeeper/ZooKeeper.h>
A
Alexey Milovidov 已提交
6
#include <Common/ZooKeeper/KeeperException.h>
7 8
#include <Common/setThreadName.h>

9 10

namespace DB
A
Alexey Milovidov 已提交
11
{
12

13 14 15 16 17 18 19
namespace ErrorCodes
{
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
    extern const int UNFINISHED;
    extern const int BAD_ARGUMENTS;
}
A
Alexey Milovidov 已提交
20

21

N
Nikita Mikhaylov 已提交
22
void ClusterCopier::init()
23
{
N
Nikita Mikhaylov 已提交
24
    auto zookeeper = context.getZooKeeper();
25

N
Nikita Mikhaylov 已提交
26 27
    task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
    {
28
        if (response.error != Coordination::Error::ZOK)
N
Nikita Mikhaylov 已提交
29
            return;
N
Nikita Mikhaylov 已提交
30
        UInt64 version = ++task_description_version;
A
Alexey Milovidov 已提交
31
        LOG_DEBUG(log, "Task description should be updated, local version {}", version);
N
Nikita Mikhaylov 已提交
32
    };
33

N
Nikita Mikhaylov 已提交
34 35
    task_description_path = task_zookeeper_path + "/description";
    task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, working_database_name);
36

N
Nikita Mikhaylov 已提交
37 38
    reloadTaskDescription();
    task_cluster_initial_config = task_cluster_current_config;
39

N
Nikita Mikhaylov 已提交
40 41 42 43 44 45
    task_cluster->loadTasks(*task_cluster_initial_config);
    context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);

    /// Set up shards and their priority
    task_cluster->random_engine.seed(task_cluster->random_device());
    for (auto & task_table : task_cluster->table_tasks)
46
    {
N
Nikita Mikhaylov 已提交
47 48 49
        task_table.cluster_pull = context.getCluster(task_table.cluster_pull_name);
        task_table.cluster_push = context.getCluster(task_table.cluster_push_name);
        task_table.initShards(task_cluster->random_engine);
50 51
    }

A
Alexey Milovidov 已提交
52
    LOG_DEBUG(log, "Will process {} table tasks", task_cluster->table_tasks.size());
N
Nikita Mikhaylov 已提交
53 54 55 56 57

    /// Do not initialize tables, will make deferred initialization in process()

    zookeeper->createAncestors(getWorkersPathVersion() + "/");
    zookeeper->createAncestors(getWorkersPath() + "/");
58 59
}

N
Nikita Mikhaylov 已提交
60 61
template <typename T>
decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
62
{
N
Nikita Mikhaylov 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    std::exception_ptr exception;

    for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
    {
        try
        {
            return func();
        }
        catch (...)
        {
            exception = std::current_exception();
            if (try_number < max_tries)
            {
                tryLogCurrentException(log, "Will retry");
                std::this_thread::sleep_for(default_sleep_time);
            }
        }
    }

    std::rethrow_exception(exception);
83 84 85
}


N
Nikita Mikhaylov 已提交
86
void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
87
{
N
Nikita Mikhaylov 已提交
88
    TaskTable & task_table = task_shard->task_table;
89

A
Alexey Milovidov 已提交
90
    LOG_INFO(log, "Discover partitions of shard {}", task_shard->getDescription());
91

N
Nikita Mikhaylov 已提交
92 93 94 95
    auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
    auto existing_partitions_names = retry(get_partitions, 60);
    Strings filtered_partitions_names;
    Strings missing_partitions;
96

N
Nikita Mikhaylov 已提交
97 98
    /// Check that user specified correct partition names
    auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted)
99
    {
N
Nikita Mikhaylov 已提交
100 101
        MutableColumnPtr column_dummy = type->createColumn();
        ReadBufferFromString rb(partition_text_quoted);
102

N
Nikita Mikhaylov 已提交
103 104 105 106 107 108 109 110 111
        try
        {
            type->deserializeAsTextQuoted(*column_dummy, rb, FormatSettings());
        }
        catch (Exception & e)
        {
            throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
        }
    };
112

N
Nikita Mikhaylov 已提交
113
    if (task_table.has_enabled_partitions)
114
    {
N
Nikita Mikhaylov 已提交
115 116 117 118 119
        /// Process partition in order specified by <enabled_partitions/>
        for (const String & partition_name : task_table.enabled_partitions)
        {
            /// Check that user specified correct partition names
            check_partition_format(task_shard->partition_key_column.type, partition_name);
120

N
Nikita Mikhaylov 已提交
121
            auto it = existing_partitions_names.find(partition_name);
122

N
Nikita Mikhaylov 已提交
123 124 125 126 127 128 129 130 131
            /// Do not process partition if it is not in enabled_partitions list
            if (it == existing_partitions_names.end())
            {
                missing_partitions.emplace_back(partition_name);
                continue;
            }

            filtered_partitions_names.emplace_back(*it);
        }
132

N
Nikita Mikhaylov 已提交
133 134 135 136
        for (const String & partition_name : existing_partitions_names)
        {
            if (!task_table.enabled_partitions_set.count(partition_name))
            {
A
Alexey Milovidov 已提交
137
                LOG_DEBUG(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
N
Nikita Mikhaylov 已提交
138 139 140 141 142 143 144
            }
        }
    }
    else
    {
        for (const String & partition_name : existing_partitions_names)
            filtered_partitions_names.emplace_back(partition_name);
145 146
    }

N
Nikita Mikhaylov 已提交
147 148
    for (const String & partition_name : filtered_partitions_names)
    {
N
Nikita Mikhaylov 已提交
149 150
        const size_t number_of_splits = task_table.number_of_splits;
        task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name, number_of_splits));
N
Nikita Mikhaylov 已提交
151
        task_shard->checked_partitions.emplace(partition_name, true);
N
Nikita Mikhaylov 已提交
152 153 154 155 156 157 158 159 160

        auto shard_partition_it = task_shard->partition_tasks.find(partition_name);
        PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;

        for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
        {
            bool res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *task_shard, partition_name, piece_number);
            shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
        }
N
Nikita Mikhaylov 已提交
161
    }
162

N
Nikita Mikhaylov 已提交
163 164
    if (!missing_partitions.empty())
    {
A
Alexander Tokmakov 已提交
165
        WriteBufferFromOwnString ss;
N
Nikita Mikhaylov 已提交
166 167
        for (const String & missing_partition : missing_partitions)
            ss << " " << missing_partition;
168

A
Alexey Milovidov 已提交
169
        LOG_WARNING(log, "There are no {} partitions from enabled_partitions in shard {} :{}", missing_partitions.size(), task_shard->getDescription(), ss.str());
N
Nikita Mikhaylov 已提交
170
    }
171

A
Alexey Milovidov 已提交
172
    LOG_DEBUG(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
N
Nikita Mikhaylov 已提交
173
}
174

N
Nikita Mikhaylov 已提交
175
void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads)
176
{
N
Nikita Mikhaylov 已提交
177 178 179
    /// Fetch partitions list from a shard
    {
        ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
180

N
Nikita Mikhaylov 已提交
181
        for (const TaskShardPtr & task_shard : task_table.all_shards)
182 183 184 185 186
            thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]()
            {
                setThreadName("DiscoverPartns");
                discoverShardPartitions(timeouts, task_shard);
            });
187

A
Alexey Milovidov 已提交
188
        LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
N
Nikita Mikhaylov 已提交
189 190 191
        thread_pool.wait();
    }
}
192

N
Nikita Mikhaylov 已提交
193
void ClusterCopier::uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
194
{
N
Nikita Mikhaylov 已提交
195
    auto local_task_description_path = task_path + "/description";
196

N
Nikita Mikhaylov 已提交
197
    String task_config_str;
198
    {
N
Nikita Mikhaylov 已提交
199 200
        ReadBufferFromFile in(task_file);
        readStringUntilEOF(task_config_str, in);
201
    }
N
Nikita Mikhaylov 已提交
202 203
    if (task_config_str.empty())
        return;
204

N
Nikita Mikhaylov 已提交
205
    auto zookeeper = context.getZooKeeper();
206

N
Nikita Mikhaylov 已提交
207 208
    zookeeper->createAncestors(local_task_description_path);
    auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
209
    if (code != Coordination::Error::ZOK && force)
N
Nikita Mikhaylov 已提交
210
        zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
211

A
Alexey Milovidov 已提交
212
    LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})",
213
        ((code != Coordination::Error::ZOK && !force) ? "not " : ""), local_task_description_path, code, Coordination::errorMessage(code));
N
Nikita Mikhaylov 已提交
214
}
215

N
Nikita Mikhaylov 已提交
216 217 218 219
void ClusterCopier::reloadTaskDescription()
{
    auto zookeeper = context.getZooKeeper();
    task_description_watch_zookeeper = zookeeper;
220

N
Nikita Mikhaylov 已提交
221
    String task_config_str;
N
Nikita Mikhaylov 已提交
222
    Coordination::Stat stat{};
223
    Coordination::Error code;
224

N
Nikita Mikhaylov 已提交
225
    zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
226
    if (code != Coordination::Error::ZOK)
N
Nikita Mikhaylov 已提交
227
        throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
228

A
Alexey Milovidov 已提交
229
    LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid);
N
Nikita Mikhaylov 已提交
230
    auto config = getConfigurationFromXMLString(task_config_str);
231

N
Nikita Mikhaylov 已提交
232 233
    /// Setup settings
    task_cluster->reloadSettings(*config);
234
    context.setSettings(task_cluster->settings_common);
235

N
Nikita Mikhaylov 已提交
236
    task_cluster_current_config = config;
N
Nikita Mikhaylov 已提交
237
    task_description_current_stat = stat;
N
Nikita Mikhaylov 已提交
238
}
239

N
Nikita Mikhaylov 已提交
240
void ClusterCopier::updateConfigIfNeeded()
241
{
N
Nikita Mikhaylov 已提交
242 243 244
    UInt64 version_to_update = task_description_version;
    bool is_outdated_version = task_description_current_version != version_to_update;
    bool is_expired_session  = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
245

N
Nikita Mikhaylov 已提交
246 247
    if (!is_outdated_version && !is_expired_session)
        return;
248

A
Alexey Milovidov 已提交
249
    LOG_DEBUG(log, "Updating task description");
N
Nikita Mikhaylov 已提交
250
    reloadTaskDescription();
251

N
Nikita Mikhaylov 已提交
252
    task_description_current_version = version_to_update;
N
Nikita Mikhaylov 已提交
253
}
254

N
Nikita Mikhaylov 已提交
255 256 257 258
void ClusterCopier::process(const ConnectionTimeouts & timeouts)
{
    for (TaskTable & task_table : task_cluster->table_tasks)
    {
A
Alexey Milovidov 已提交
259
        LOG_INFO(log, "Process table task {} with {} shards, {} of them are local ones", task_table.table_id, task_table.all_shards.size(), task_table.local_shards.size());
260

N
Nikita Mikhaylov 已提交
261 262
        if (task_table.all_shards.empty())
            continue;
263

N
Nikita Mikhaylov 已提交
264 265 266 267 268
        /// Discover partitions of each shard and total set of partitions
        if (!task_table.has_enabled_partitions)
        {
            /// If there are no specified enabled_partitions, we must discover them manually
            discoverTablePartitions(timeouts, task_table);
269

N
Nikita Mikhaylov 已提交
270 271 272 273 274 275 276 277 278
            /// After partitions of each shard are initialized, initialize cluster partitions
            for (const TaskShardPtr & task_shard : task_table.all_shards)
            {
                for (const auto & partition_elem : task_shard->partition_tasks)
                {
                    const String & partition_name = partition_elem.first;
                    task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
                }
            }
279

N
Nikita Mikhaylov 已提交
280 281 282
            for (auto & partition_elem : task_table.cluster_partitions)
            {
                const String & partition_name = partition_elem.first;
283

N
Nikita Mikhaylov 已提交
284 285
                for (const TaskShardPtr & task_shard : task_table.all_shards)
                    task_shard->checked_partitions.emplace(partition_name);
286

N
Nikita Mikhaylov 已提交
287 288 289 290 291 292 293
                task_table.ordered_partition_names.emplace_back(partition_name);
            }
        }
        else
        {
            /// If enabled_partitions are specified, assume that each shard has all partitions
            /// We will refine partition set of each shard in future
294

N
Nikita Mikhaylov 已提交
295 296 297 298 299 300
            for (const String & partition_name : task_table.enabled_partitions)
            {
                task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
                task_table.ordered_partition_names.emplace_back(partition_name);
            }
        }
301

N
Nikita Mikhaylov 已提交
302
        task_table.watch.restart();
303

N
Nikita Mikhaylov 已提交
304 305 306 307 308 309 310 311 312 313
        /// Retry table processing
        bool table_is_done = false;
        for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
        {
            if (tryProcessTable(timeouts, task_table))
            {
                table_is_done = true;
                break;
            }
        }
314

N
Nikita Mikhaylov 已提交
315 316 317 318 319
        if (!table_is_done)
        {
            throw Exception("Too many tries to process table " + task_table.table_id + ". Abort remaining execution",
                            ErrorCodes::UNFINISHED);
        }
320
    }
N
Nikita Mikhaylov 已提交
321
}
322

N
Nikita Mikhaylov 已提交
323
/// Protected section
324

N
Nikita Mikhaylov 已提交
325 326 327

/*
 * Creates task worker node and checks maximum number of workers not to exceed the limit.
A
Alexey Milovidov 已提交
328
 * To achieve this we have to check version of workers_version_path node and create current_worker_path
N
Nikita Mikhaylov 已提交
329 330 331
 * node atomically.
 * */

N
Nikita Mikhaylov 已提交
332 333 334 335
zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNeed(
    const zkutil::ZooKeeperPtr & zookeeper,
    const String & description,
    bool unprioritized)
336
{
N
Nikita Mikhaylov 已提交
337 338
    std::chrono::milliseconds current_sleep_time = default_sleep_time;
    static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
339

N
Nikita Mikhaylov 已提交
340 341
    if (unprioritized)
        std::this_thread::sleep_for(current_sleep_time);
342

N
Nikita Mikhaylov 已提交
343
    String workers_version_path = getWorkersPathVersion();
N
Nikita Mikhaylov 已提交
344 345
    String workers_path         = getWorkersPath();
    String current_worker_path  = getCurrentWorkerNodePath();
346

N
Nikita Mikhaylov 已提交
347
    UInt64 num_bad_version_errors = 0;
348

N
Nikita Mikhaylov 已提交
349 350 351
    while (true)
    {
        updateConfigIfNeeded();
352

N
Nikita Mikhaylov 已提交
353 354 355 356
        Coordination::Stat stat;
        zookeeper->get(workers_version_path, &stat);
        auto version = stat.version;
        zookeeper->get(workers_path, &stat);
357

N
Nikita Mikhaylov 已提交
358 359
        if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
        {
A
Alexey Milovidov 已提交
360
            LOG_DEBUG(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
361

N
Nikita Mikhaylov 已提交
362 363
            if (unprioritized)
                current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
364

N
Nikita Mikhaylov 已提交
365 366 367 368 369 370 371 372 373 374
            std::this_thread::sleep_for(current_sleep_time);
            num_bad_version_errors = 0;
        }
        else
        {
            Coordination::Requests ops;
            ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
            ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
            Coordination::Responses responses;
            auto code = zookeeper->tryMulti(ops, responses);
375

376
            if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
N
Nikita Mikhaylov 已提交
377
                return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
378

379
            if (code == Coordination::Error::ZBADVERSION)
N
Nikita Mikhaylov 已提交
380 381
            {
                ++num_bad_version_errors;
382

N
Nikita Mikhaylov 已提交
383 384 385
                /// Try to make fast retries
                if (num_bad_version_errors > 3)
                {
A
Alexey Milovidov 已提交
386
                    LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
N
Nikita Mikhaylov 已提交
387 388 389 390 391 392 393 394 395
                    std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution<int>(1, 1000)(task_cluster->random_engine));
                    std::this_thread::sleep_for(random_sleep_time);
                    num_bad_version_errors = 0;
                }
            }
            else
                throw Coordination::Exception(code);
        }
    }
396 397
}

N
Nikita Mikhaylov 已提交
398

N
Nikita Mikhaylov 已提交
399 400 401
bool ClusterCopier::checkPartitionPieceIsClean(
        const zkutil::ZooKeeperPtr & zookeeper,
        const CleanStateClock & clean_state_clock,
N
Nikita Mikhaylov 已提交
402
        const String & task_status_path)
403
{
N
Nikita Mikhaylov 已提交
404
    LogicalClock task_start_clock;
N
Nikita Mikhaylov 已提交
405

N
Nikita Mikhaylov 已提交
406 407 408
    Coordination::Stat stat{};
    if (zookeeper->exists(task_status_path, &stat))
        task_start_clock = LogicalClock(stat.mzxid);
N
Nikita Mikhaylov 已提交
409

N
Nikita Mikhaylov 已提交
410
    return clean_state_clock.is_clean() && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock);
N
Nikita Mikhaylov 已提交
411 412
}

N
Nikita Mikhaylov 已提交
413 414

bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
N
Nikita Mikhaylov 已提交
415 416
{
    bool answer = true;
N
Nikita Mikhaylov 已提交
417 418 419 420
    for (size_t piece_number = 0; piece_number < task_table.number_of_splits; ++piece_number)
    {
        bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
        if (!piece_is_done)
A
Alexey Milovidov 已提交
421
            LOG_DEBUG(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
N
Nikita Mikhaylov 已提交
422 423 424
        answer &= piece_is_done;
    }

N
Nikita Mikhaylov 已提交
425 426 427 428 429 430 431 432 433 434 435 436
    return answer;
}


/* The same as function above
 * Assume that we don't know on which shards do we have partition certain piece.
 * We'll check them all (I mean shards that contain the whole partition)
 * And shards that don't have certain piece MUST mark that piece is_done true.
 * */
bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
                               size_t piece_number, const TasksShard & shards_with_partition)
{
A
Alexey Milovidov 已提交
437
    LOG_DEBUG(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
438

N
Nikita Mikhaylov 已提交
439
    auto zookeeper = context.getZooKeeper();
440

N
Nikita Mikhaylov 已提交
441 442
    /// Collect all shards that contain partition piece number piece_number.
    Strings piece_status_paths;
A
Alexey Milovidov 已提交
443
    for (const auto & shard : shards_with_partition)
N
Nikita Mikhaylov 已提交
444 445
    {
        ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
N
Nikita Mikhaylov 已提交
446 447
        ShardPartitionPiece & shard_partition_piece = task_shard_partition.pieces[piece_number];
        piece_status_paths.emplace_back(shard_partition_piece.getShardStatusPath());
N
Nikita Mikhaylov 已提交
448
    }
449

N
Nikita Mikhaylov 已提交
450
    std::vector<int64_t> zxid1, zxid2;
451

N
Nikita Mikhaylov 已提交
452 453 454
    try
    {
        std::vector<zkutil::ZooKeeper::FutureGet> get_futures;
N
Nikita Mikhaylov 已提交
455
        for (const String & path : piece_status_paths)
N
Nikita Mikhaylov 已提交
456
            get_futures.emplace_back(zookeeper->asyncGet(path));
457

N
Nikita Mikhaylov 已提交
458 459 460 461
        // Check that state is Finished and remember zxid
        for (auto & future : get_futures)
        {
            auto res = future.get();
462

N
Nikita Mikhaylov 已提交
463 464 465
            TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data);
            if (status.state != TaskState::Finished)
            {
A
Alexey Milovidov 已提交
466
                LOG_INFO(log, "The task {} is being rewritten by {}. Partition piece will be rechecked", res.data, status.owner);
N
Nikita Mikhaylov 已提交
467 468
                return false;
            }
469

N
Nikita Mikhaylov 已提交
470 471
            zxid1.push_back(res.stat.pzxid);
        }
472

N
Nikita Mikhaylov 已提交
473 474 475 476 477 478 479 480
        const String piece_is_dirty_flag_path = task_table.getCertainPartitionPieceIsDirtyPath(partition_name, piece_number);
        const String piece_is_dirty_cleaned_path = task_table.getCertainPartitionPieceIsCleanedPath(partition_name, piece_number);
        const String piece_task_status_path = task_table.getCertainPartitionPieceTaskStatusPath(partition_name, piece_number);

        CleanStateClock clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);

        const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_task_status_path);

N
Nikita Mikhaylov 已提交
481

N
Nikita Mikhaylov 已提交
482
        if (!is_clean)
N
Nikita Mikhaylov 已提交
483
        {
A
Alexey Milovidov 已提交
484
            LOG_INFO(log, "Partition {} become dirty", partition_name);
N
Nikita Mikhaylov 已提交
485
            return false;
N
Nikita Mikhaylov 已提交
486
        }
487

N
Nikita Mikhaylov 已提交
488
        get_futures.clear();
N
Nikita Mikhaylov 已提交
489
        for (const String & path : piece_status_paths)
N
Nikita Mikhaylov 已提交
490
            get_futures.emplace_back(zookeeper->asyncGet(path));
491

N
Nikita Mikhaylov 已提交
492 493 494 495 496 497 498 499 500
        // Remember zxid of states again
        for (auto & future : get_futures)
        {
            auto res = future.get();
            zxid2.push_back(res.stat.pzxid);
        }
    }
    catch (const Coordination::Exception & e)
    {
A
Alexey Milovidov 已提交
501
        LOG_INFO(log, "A ZooKeeper error occurred while checking partition {} piece number {}. Will recheck the partition. Error: {}", partition_name, toString(piece_number), e.displayText());
N
Nikita Mikhaylov 已提交
502 503
        return false;
    }
504

N
Nikita Mikhaylov 已提交
505
    // If all task is finished and zxid is not changed then partition could not become dirty again
N
Nikita Mikhaylov 已提交
506
    for (UInt64 shard_num = 0; shard_num < piece_status_paths.size(); ++shard_num)
N
Nikita Mikhaylov 已提交
507 508 509
    {
        if (zxid1[shard_num] != zxid2[shard_num])
        {
A
Alexey Milovidov 已提交
510
            LOG_INFO(log, "The task {} is being modified now. Partition piece will be rechecked", piece_status_paths[shard_num]);
N
Nikita Mikhaylov 已提交
511 512 513
            return false;
        }
    }
514

A
Alexey Milovidov 已提交
515
    LOG_INFO(log, "Partition {} piece number {} is copied successfully", partition_name, toString(piece_number));
N
Nikita Mikhaylov 已提交
516
    return true;
517 518
}

N
Nikita Mikhaylov 已提交
519

N
Nikita Mikhaylov 已提交
520
TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name)
N
Nikita Mikhaylov 已提交
521
{
N
Nikita Mikhaylov 已提交
522 523 524 525 526 527 528
    bool inject_fault = false;
    if (move_fault_probability > 0)
    {
        double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
        inject_fault = value < move_fault_probability;
    }

A
Alexey Milovidov 已提交
529
    LOG_DEBUG(log, "Try to move {} to destination table", partition_name);
N
Nikita Mikhaylov 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544

    auto zookeeper = context.getZooKeeper();

    const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name);
    const auto current_partition_attach_is_done   = task_table.getPartitionAttachIsDonePath(partition_name);

    /// Create ephemeral node to mark that we are active and process the partition
    zookeeper->createAncestors(current_partition_attach_is_active);
    zkutil::EphemeralNodeHolderPtr partition_attach_node_holder;
    try
    {
        partition_attach_node_holder = zkutil::EphemeralNodeHolder::create(current_partition_attach_is_active, *zookeeper, host_id);
    }
    catch (const Coordination::Exception & e)
    {
545
        if (e.code == Coordination::Error::ZNODEEXISTS)
N
Nikita Mikhaylov 已提交
546
        {
A
Alexey Milovidov 已提交
547
            LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
N
Nikita Mikhaylov 已提交
548
            return TaskStatus::Active;
N
Nikita Mikhaylov 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
        }

        throw;
    }


    /// Exit if task has been already processed;
    /// create blocking node to signal cleaning up if it is abandoned
    {
        String status_data;
        if (zookeeper->tryGet(current_partition_attach_is_done, status_data))
        {
            TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
            if (status.state == TaskState::Finished)
            {
A
Alexey Milovidov 已提交
564
                LOG_DEBUG(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
N
Nikita Mikhaylov 已提交
565
                return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
566 567 568 569
            }

            /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
            /// Initialize DROP PARTITION
A
Alexey Milovidov 已提交
570
            LOG_DEBUG(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
N
better  
Nikita Mikhaylov 已提交
571 572 573

            /// Remove is_done marker.
            zookeeper->remove(current_partition_attach_is_done);
N
Nikita Mikhaylov 已提交
574 575 576 577 578 579 580 581 582 583
        }
    }


    /// Try start processing, create node about it
    {
        String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
        zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
    }

N
Nikita Mikhaylov 已提交
584
    /// Move partition to original destination table.
N
Nikita Mikhaylov 已提交
585 586
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
A
Alexey Milovidov 已提交
587
        LOG_DEBUG(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
588

N
Nikita Mikhaylov 已提交
589 590
        ASTPtr query_alter_ast;
        String query_alter_ast_string;
N
Nikita Mikhaylov 已提交
591

N
Nikita Mikhaylov 已提交
592 593 594 595
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first,
                                                                  original_table.second + "_piece_" +
                                                                  toString(current_piece_number));
N
Nikita Mikhaylov 已提交
596

N
nikitamikhaylov 已提交
597 598 599 600 601 602 603
        Settings settings_push = task_cluster->settings_push;

        /// It is important, ALTER ATTACH PARTITION must be done synchronously
        /// And we will execute this ALTER query on each replica of a shard.
        /// It is correct, because this query is idempotent.
        settings_push.replication_alter_partitions_sync = 2;

N
Nikita Mikhaylov 已提交
604
        query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
605
                                  ((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
N
nikitamikhaylov 已提交
606
                                  " FROM " + getQuotedTable(helping_table);
N
Nikita Mikhaylov 已提交
607

A
Alexey Milovidov 已提交
608
        LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
N
Nikita Mikhaylov 已提交
609

N
Nikita Mikhaylov 已提交
610 611
        try
        {
N
Nikita Mikhaylov 已提交
612
            size_t num_nodes = executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
613 614
                    task_table.cluster_push,
                    query_alter_ast_string,
615
                    settings_push,
N
Nikita Mikhaylov 已提交
616 617
                    PoolMode::GET_MANY,
                    ClusterExecutionMode::ON_EACH_NODE);
N
Nikita Mikhaylov 已提交
618

A
Alexey Milovidov 已提交
619
            LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
N
Nikita Mikhaylov 已提交
620 621 622
        }
        catch (...)
        {
A
Alexey Milovidov 已提交
623
            LOG_DEBUG(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
624 625
            throw;
        }
N
Nikita Mikhaylov 已提交
626

N
Nikita Mikhaylov 已提交
627 628
        if (inject_fault)
            throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
N
Nikita Mikhaylov 已提交
629

N
Nikita Mikhaylov 已提交
630 631 632
        try
        {
            String query_deduplicate_ast_string;
N
Nikita Mikhaylov 已提交
633 634
            if (!task_table.isReplicatedTable())
            {
N
Nikita Mikhaylov 已提交
635
                query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
636
                                                ((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
N
Nikita Mikhaylov 已提交
637

A
Alexey Milovidov 已提交
638
                LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
N
Nikita Mikhaylov 已提交
639 640 641 642

                UInt64 num_nodes = executeQueryOnCluster(
                        task_table.cluster_push,
                        query_deduplicate_ast_string,
643
                        task_cluster->settings_push,
N
Nikita Mikhaylov 已提交
644 645
                        PoolMode::GET_MANY);

A
Alexey Milovidov 已提交
646
                LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
N
Nikita Mikhaylov 已提交
647 648
            }
        }
N
Nikita Mikhaylov 已提交
649 650
        catch (...)
        {
A
Alexey Milovidov 已提交
651
            LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
N
Nikita Mikhaylov 已提交
652 653
            throw;
        }
N
Nikita Mikhaylov 已提交
654 655 656 657 658 659 660 661
    }

    /// Create node to signal that we finished moving
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
        zookeeper->set(current_partition_attach_is_done, state_finished, 0);
    }

N
Nikita Mikhaylov 已提交
662
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
663 664
}

N
Nikita Mikhaylov 已提交
665
/// Removes MATERIALIZED and ALIAS columns from create table query
N
Nikita Mikhaylov 已提交
666
ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
667
{
N
Nikita Mikhaylov 已提交
668 669
    const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
    auto new_columns = std::make_shared<ASTExpressionList>();
670

N
Nikita Mikhaylov 已提交
671 672 673
    for (const ASTPtr & column_ast : column_asts)
    {
        const auto & column = column_ast->as<ASTColumnDeclaration &>();
674

N
Nikita Mikhaylov 已提交
675 676 677 678 679 680
        if (!column.default_specifier.empty())
        {
            ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
            if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
                continue;
        }
681

N
Nikita Mikhaylov 已提交
682 683
        new_columns->children.emplace_back(column_ast->clone());
    }
684

N
Nikita Mikhaylov 已提交
685 686
    ASTPtr new_query_ast = query_ast->clone();
    auto & new_query = new_query_ast->as<ASTCreateQuery &>();
687

N
Nikita Mikhaylov 已提交
688 689
    auto new_columns_list = std::make_shared<ASTColumns>();
    new_columns_list->set(new_columns_list->columns, new_columns);
A
Alexey Milovidov 已提交
690
    if (const auto * indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
N
Nikita Mikhaylov 已提交
691
        new_columns_list->set(new_columns_list->indices, indices->clone());
692

N
Nikita Mikhaylov 已提交
693 694 695
    new_query.replace(new_query.columns_list, new_columns_list);

    return new_query_ast;
696 697
}

A
Alexey Milovidov 已提交
698
/// Replaces ENGINE and table name in a create query
699 700 701
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast,
                                                          const DatabaseAndTableName & new_table,
                                                          const ASTPtr & new_storage_ast)
702
{
N
Nikita Mikhaylov 已提交
703 704
    const auto & create = create_query_ast->as<ASTCreateQuery &>();
    auto res = std::make_shared<ASTCreateQuery>(create);
705

N
Nikita Mikhaylov 已提交
706 707
    if (create.storage == nullptr || new_storage_ast == nullptr)
        throw Exception("Storage is not specified", ErrorCodes::LOGICAL_ERROR);
708

N
Nikita Mikhaylov 已提交
709 710
    res->database = new_table.first;
    res->table = new_table.second;
711

N
Nikita Mikhaylov 已提交
712 713 714
    res->children.clear();
    res->set(res->columns_list, create.columns_list->clone());
    res->set(res->storage, new_storage_ast->clone());
715

N
Nikita Mikhaylov 已提交
716
    return res;
717 718
}

719

N
Nikita Mikhaylov 已提交
720 721 722 723 724
bool ClusterCopier::tryDropPartitionPiece(
        ShardPartition & task_partition,
        const size_t current_piece_number,
        const zkutil::ZooKeeperPtr & zookeeper,
        const CleanStateClock & clean_state_clock)
725
{
N
Nikita Mikhaylov 已提交
726 727
    if (is_safe_mode)
        throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
728

N
Nikita Mikhaylov 已提交
729
    TaskTable & task_table = task_partition.task_shard.task_table;
N
Nikita Mikhaylov 已提交
730
    ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];
731

N
Nikita Mikhaylov 已提交
732 733 734 735 736
    const String current_shards_path                  = partition_piece.getPartitionPieceShardsPath();
    const String current_partition_active_workers_dir = partition_piece.getPartitionPieceActiveWorkersPath();
    const String is_dirty_flag_path                   = partition_piece.getPartitionPieceIsDirtyPath();
    const String dirty_cleaner_path                   = partition_piece.getPartitionPieceCleanerPath();
    const String is_dirty_cleaned_path                = partition_piece.getPartitionPieceIsCleanedPath();
737

N
Nikita Mikhaylov 已提交
738 739
    zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
    try
740
    {
N
Nikita Mikhaylov 已提交
741
        cleaner_holder = zkutil::EphemeralNodeHolder::create(dirty_cleaner_path, *zookeeper, host_id);
742
    }
N
Nikita Mikhaylov 已提交
743
    catch (const Coordination::Exception & e)
744
    {
745
        if (e.code == Coordination::Error::ZNODEEXISTS)
N
Nikita Mikhaylov 已提交
746
        {
A
Alexey Milovidov 已提交
747
            LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
748 749 750
            std::this_thread::sleep_for(default_sleep_time);
            return false;
        }
751

N
Nikita Mikhaylov 已提交
752
        throw;
753
    }
754

N
Nikita Mikhaylov 已提交
755
    Coordination::Stat stat{};
N
Nikita Mikhaylov 已提交
756
    if (zookeeper->exists(current_partition_active_workers_dir, &stat))
757
    {
N
Nikita Mikhaylov 已提交
758
        if (stat.numChildren != 0)
759
        {
A
Alexey Milovidov 已提交
760
            LOG_DEBUG(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
N
Nikita Mikhaylov 已提交
761 762
            std::this_thread::sleep_for(default_sleep_time);
            return false;
763 764 765
        }
        else
        {
N
Nikita Mikhaylov 已提交
766
            zookeeper->remove(current_partition_active_workers_dir);
767
        }
768
    }
769

N
Nikita Mikhaylov 已提交
770 771 772 773 774 775 776 777
    {
        zkutil::EphemeralNodeHolder::Ptr active_workers_lock;
        try
        {
            active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id);
        }
        catch (const Coordination::Exception & e)
        {
778
            if (e.code == Coordination::Error::ZNODEEXISTS)
N
Nikita Mikhaylov 已提交
779
            {
A
Alexey Milovidov 已提交
780
                LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
N
Nikita Mikhaylov 已提交
781 782
                return false;
            }
783

N
Nikita Mikhaylov 已提交
784 785
            throw;
        }
786

N
Nikita Mikhaylov 已提交
787 788
        // Lock the dirty flag
        zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value());
N
Nikita Mikhaylov 已提交
789
        zookeeper->tryRemove(partition_piece.getPartitionPieceCleanStartPath());
N
Nikita Mikhaylov 已提交
790
        CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirty_cleaned_path);
791

N
Nikita Mikhaylov 已提交
792
        /// Remove all status nodes
793
        {
N
Nikita Mikhaylov 已提交
794
            Strings children;
795
            if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::Error::ZOK)
N
Nikita Mikhaylov 已提交
796 797 798 799
                for (const auto & child : children)
                {
                    zookeeper->removeRecursive(current_shards_path + "/" + child);
                }
800 801
        }

N
Nikita Mikhaylov 已提交
802 803 804 805 806

        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

        String query = "ALTER TABLE " + getQuotedTable(helping_table);
807
        query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ")  + task_partition.name + "";
808

N
Nikita Mikhaylov 已提交
809 810
        /// TODO: use this statement after servers will be updated up to 1.1.54310
        // query += " DROP PARTITION ID '" + task_partition.name + "'";
811

N
Nikita Mikhaylov 已提交
812 813
        ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;
814

N
Nikita Mikhaylov 已提交
815 816
        /// It is important, DROP PARTITION must be done synchronously
        settings_push.replication_alter_partitions_sync = 2;
817

A
Alexey Milovidov 已提交
818
        LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
N
better  
Nikita Mikhaylov 已提交
819
        /// We have to drop partition_piece on each replica
N
Nikita Mikhaylov 已提交
820
        size_t num_shards = executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
821
                cluster_push, query,
822
                settings_push,
N
better  
Nikita Mikhaylov 已提交
823 824
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);
825

A
Alexey Milovidov 已提交
826
        LOG_INFO(log, "DROP PARTITION was successfully executed on {} nodes of a cluster.", num_shards);
827

N
Nikita Mikhaylov 已提交
828 829 830 831 832
        /// Update the locking node
        if (!my_clock.is_stale())
        {
            zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value());
            if (my_clock.clean_state_version)
N
Nikita Mikhaylov 已提交
833
                zookeeper->set(is_dirty_cleaned_path, host_id, my_clock.clean_state_version.value());
N
Nikita Mikhaylov 已提交
834
            else
N
Nikita Mikhaylov 已提交
835
                zookeeper->create(is_dirty_cleaned_path, host_id, zkutil::CreateMode::Persistent);
N
Nikita Mikhaylov 已提交
836 837
        }
        else
838
        {
A
Alexey Milovidov 已提交
839
            LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
N
Nikita Mikhaylov 已提交
840 841 842
            /// clean state is stale
            return false;
        }
843

A
Alexey Milovidov 已提交
844
        LOG_INFO(log, "Partition {} piece {} was dropped on cluster {}", task_partition.name, toString(current_piece_number), task_table.cluster_push_name);
845
        if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::Error::ZNODEEXISTS)
N
Nikita Mikhaylov 已提交
846 847
            zookeeper->set(current_shards_path, host_id);
    }
848

A
Alexey Milovidov 已提交
849
    LOG_INFO(log, "Partition {} piece {} is safe for work now.", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
850 851
    return true;
}
852

N
Nikita Mikhaylov 已提交
853
bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
854
{
N
Nikita Mikhaylov 已提交
855 856
    /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
    bool previous_shard_is_instantly_finished = false;
857

N
Nikita Mikhaylov 已提交
858 859 860 861 862
    /// Process each partition that is present in cluster
    for (const String & partition_name : task_table.ordered_partition_names)
    {
        if (!task_table.cluster_partitions.count(partition_name))
            throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
863

N
Nikita Mikhaylov 已提交
864
        ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
865

N
Nikita Mikhaylov 已提交
866
        Stopwatch watch;
N
Nikita Mikhaylov 已提交
867
        /// We will check all the shards of the table and check if they contain current partition.
N
Nikita Mikhaylov 已提交
868 869
        TasksShard expected_shards;
        UInt64 num_failed_shards = 0;
870

N
Nikita Mikhaylov 已提交
871
        ++cluster_partition.total_tries;
872

A
Alexey Milovidov 已提交
873
        LOG_DEBUG(log, "Processing partition {} for the whole cluster", partition_name);
874

N
Nikita Mikhaylov 已提交
875 876 877 878
        /// Process each source shard having current partition and copy current partition
        /// NOTE: shards are sorted by "distance" to current host
        bool has_shard_to_process = false;
        for (const TaskShardPtr & shard : task_table.all_shards)
879
        {
N
Nikita Mikhaylov 已提交
880 881
            /// Does shard have a node with current partition?
            if (shard->partition_tasks.count(partition_name) == 0)
882
            {
N
Nikita Mikhaylov 已提交
883 884
                /// If not, did we check existence of that partition previously?
                if (shard->checked_partitions.count(partition_name) == 0)
885
                {
N
Nikita Mikhaylov 已提交
886 887
                    auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
                    bool has_partition = retry(check_shard_has_partition);
888

N
Nikita Mikhaylov 已提交
889
                    shard->checked_partitions.emplace(partition_name);
890

N
Nikita Mikhaylov 已提交
891 892
                    if (has_partition)
                    {
N
Nikita Mikhaylov 已提交
893 894
                        const size_t number_of_splits = task_table.number_of_splits;
                        shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name, number_of_splits));
A
Alexey Milovidov 已提交
895
                        LOG_DEBUG(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
N
Nikita Mikhaylov 已提交
896 897 898 899 900 901 902 903 904
                        /// To save references in the future.
                        auto shard_partition_it = shard->partition_tasks.find(partition_name);
                        PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;

                        for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
                        {
                            auto res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *shard, partition_name, piece_number);
                            shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
                        }
N
Nikita Mikhaylov 已提交
905 906 907
                    }
                    else
                    {
A
Alexey Milovidov 已提交
908
                        LOG_DEBUG(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
N
Nikita Mikhaylov 已提交
909 910 911 912
                        continue;
                    }
                }
                else
913
                {
N
Nikita Mikhaylov 已提交
914 915 916
                    /// We have already checked that partition, but did not discover it
                    previous_shard_is_instantly_finished = true;
                    continue;
917
                }
N
Nikita Mikhaylov 已提交
918
            }
919

N
Nikita Mikhaylov 已提交
920
            auto it_shard_partition = shard->partition_tasks.find(partition_name);
N
Nikita Mikhaylov 已提交
921 922
            /// Previously when we discovered that shard does not contain current partition, we skipped it.
            /// At this moment partition have to be present.
N
Nikita Mikhaylov 已提交
923
            if (it_shard_partition == shard->partition_tasks.end())
N
Nikita Mikhaylov 已提交
924
                throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
N
Nikita Mikhaylov 已提交
925
            auto & partition = it_shard_partition->second;
926

N
Nikita Mikhaylov 已提交
927
            expected_shards.emplace_back(shard);
928

N
Nikita Mikhaylov 已提交
929 930
            /// Do not sleep if there is a sequence of already processed shards to increase startup
            bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
N
Nikita Mikhaylov 已提交
931
            TaskStatus task_status = TaskStatus::Error;
N
Nikita Mikhaylov 已提交
932 933 934
            bool was_error = false;
            has_shard_to_process = true;
            for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
935
            {
N
Nikita Mikhaylov 已提交
936
                task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
937

N
Nikita Mikhaylov 已提交
938
                /// Exit if success
N
Nikita Mikhaylov 已提交
939
                if (task_status == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
940
                    break;
941

N
Nikita Mikhaylov 已提交
942
                was_error = true;
943

N
Nikita Mikhaylov 已提交
944
                /// Skip if the task is being processed by someone
N
Nikita Mikhaylov 已提交
945
                if (task_status == TaskStatus::Active)
N
Nikita Mikhaylov 已提交
946
                    break;
947

N
Nikita Mikhaylov 已提交
948 949
                /// Repeat on errors
                std::this_thread::sleep_for(default_sleep_time);
950
            }
N
Nikita Mikhaylov 已提交
951

N
Nikita Mikhaylov 已提交
952
            if (task_status == TaskStatus::Error)
N
Nikita Mikhaylov 已提交
953 954 955
                ++num_failed_shards;

            previous_shard_is_instantly_finished = !was_error;
956 957
        }

N
Nikita Mikhaylov 已提交
958 959 960 961
        cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();

        /// Check that whole cluster partition is done
        /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
N
Nikita Mikhaylov 已提交
962
        bool partition_copying_is_done = num_failed_shards == 0;
N
Nikita Mikhaylov 已提交
963
        try
964
        {
N
Nikita Mikhaylov 已提交
965
            partition_copying_is_done =
N
Nikita Mikhaylov 已提交
966
                    !has_shard_to_process
N
Nikita Mikhaylov 已提交
967
                    || (partition_copying_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards));
968
        }
N
Nikita Mikhaylov 已提交
969
        catch (...)
970
        {
N
Nikita Mikhaylov 已提交
971
            tryLogCurrentException(log);
N
Nikita Mikhaylov 已提交
972 973 974 975 976 977 978 979
            partition_copying_is_done = false;
        }


        bool partition_moving_is_done = false;
        /// Try to move only if all pieces were copied.
        if (partition_copying_is_done)
        {
N
Nikita Mikhaylov 已提交
980
            for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num)
N
Nikita Mikhaylov 已提交
981
            {
N
Nikita Mikhaylov 已提交
982 983 984
                try
                {
                    auto res = tryMoveAllPiecesToDestinationTable(task_table, partition_name);
N
Nikita Mikhaylov 已提交
985 986
                    /// Exit and mark current task is done.
                    if (res == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
987 988 989 990 991
                    {
                        partition_moving_is_done = true;
                        break;
                    }

N
Nikita Mikhaylov 已提交
992 993
                    /// Exit if this task is active.
                    if (res == TaskStatus::Active)
N
better  
Nikita Mikhaylov 已提交
994
                        break;
N
Nikita Mikhaylov 已提交
995

N
Nikita Mikhaylov 已提交
996
                    /// Repeat on errors.
N
better  
Nikita Mikhaylov 已提交
997
                    std::this_thread::sleep_for(default_sleep_time);
N
Nikita Mikhaylov 已提交
998
                }
N
Nikita Mikhaylov 已提交
999 1000
                catch (...)
                {
A
Alexey Milovidov 已提交
1001
                    tryLogCurrentException(log, "Some error occurred while moving pieces to destination table for partition " + partition_name);
N
Nikita Mikhaylov 已提交
1002
                }
N
Nikita Mikhaylov 已提交
1003
            }
1004 1005
        }

N
Nikita Mikhaylov 已提交
1006
        if (partition_copying_is_done && partition_moving_is_done)
1007
        {
N
Nikita Mikhaylov 已提交
1008
            task_table.finished_cluster_partitions.emplace(partition_name);
1009

N
Nikita Mikhaylov 已提交
1010 1011 1012
            task_table.bytes_copied += cluster_partition.bytes_copied;
            task_table.rows_copied += cluster_partition.rows_copied;
            double elapsed = cluster_partition.elapsed_time_seconds;
1013

A
Alexey Milovidov 已提交
1014
            LOG_INFO(log, "It took {} seconds to copy partition {}: {} uncompressed bytes, {} rows and {} source blocks are copied",
A
Alexey Milovidov 已提交
1015 1016 1017 1018
                elapsed, partition_name,
                formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied),
                formatReadableQuantity(cluster_partition.rows_copied),
                cluster_partition.blocks_copied);
1019

N
Nikita Mikhaylov 已提交
1020
            if (cluster_partition.rows_copied)
1021
            {
A
Alexey Milovidov 已提交
1022
                LOG_INFO(log, "Average partition speed: {} per second.", formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed));
1023
            }
1024

N
Nikita Mikhaylov 已提交
1025
            if (task_table.rows_copied)
1026
            {
A
Alexey Milovidov 已提交
1027
                LOG_INFO(log, "Average table {} speed: {} per second.", task_table.table_id, formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed));
N
Nikita Mikhaylov 已提交
1028 1029 1030
            }
        }
    }
1031

N
Nikita Mikhaylov 已提交
1032 1033 1034
    UInt64 required_partitions = task_table.cluster_partitions.size();
    UInt64 finished_partitions = task_table.finished_cluster_partitions.size();
    bool table_is_done = finished_partitions >= required_partitions;
1035

N
Nikita Mikhaylov 已提交
1036 1037
    if (!table_is_done)
    {
A
Alexey Milovidov 已提交
1038
        LOG_INFO(log, "Table {} is not processed yet.Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
N
Nikita Mikhaylov 已提交
1039
    }
1040 1041 1042 1043 1044
    else
    {
        /// Delete helping tables in case that whole table is done
        dropHelpingTables(task_table);
    }
1045

N
Nikita Mikhaylov 已提交
1046 1047
    return table_is_done;
}
1048

N
Nikita Mikhaylov 已提交
1049
/// Job for copying partition from particular shard.
N
Nikita Mikhaylov 已提交
1050
TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
N
Nikita Mikhaylov 已提交
1051
{
N
Nikita Mikhaylov 已提交
1052
    TaskStatus res;
1053

N
Nikita Mikhaylov 已提交
1054 1055
    try
    {
N
Nikita Mikhaylov 已提交
1056
        res = iterateThroughAllPiecesInPartition(timeouts, task_partition, is_unprioritized_task);
N
Nikita Mikhaylov 已提交
1057 1058 1059 1060
    }
    catch (...)
    {
        tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
N
Nikita Mikhaylov 已提交
1061
        res = TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1062
    }
1063

N
Nikita Mikhaylov 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072
    /// At the end of each task check if the config is updated
    try
    {
        updateConfigIfNeeded();
    }
    catch (...)
    {
        tryLogCurrentException(log, "An error occurred while updating the config");
    }
1073

N
Nikita Mikhaylov 已提交
1074 1075
    return res;
}
1076

N
Nikita Mikhaylov 已提交
1077
TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
N
Nikita Mikhaylov 已提交
1078 1079 1080 1081
                                                       bool is_unprioritized_task)
{
    const size_t total_number_of_pieces = task_partition.task_shard.task_table.number_of_splits;

N
Nikita Mikhaylov 已提交
1082
    TaskStatus res{TaskStatus::Finished};
N
Nikita Mikhaylov 已提交
1083 1084 1085

    bool was_failed_pieces = false;
    bool was_active_pieces = false;
N
Nikita Mikhaylov 已提交
1086

N
Nikita Mikhaylov 已提交
1087 1088
    for (size_t piece_number = 0; piece_number < total_number_of_pieces; piece_number++)
    {
N
Nikita Mikhaylov 已提交
1089 1090
        for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
        {
A
Alexey Milovidov 已提交
1091
            LOG_INFO(log, "Attempt number {} to process partition {} piece number {} on shard number {} with index {}.",
A
Alexey Milovidov 已提交
1092 1093 1094 1095
                try_num, task_partition.name, piece_number,
                task_partition.task_shard.numberInCluster(),
                task_partition.task_shard.indexInCluster());

N
Nikita Mikhaylov 已提交
1096 1097 1098
            res = processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task);

            /// Exit if success
N
Nikita Mikhaylov 已提交
1099
            if (res == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
1100 1101 1102
                break;

            /// Skip if the task is being processed by someone
N
Nikita Mikhaylov 已提交
1103
            if (res == TaskStatus::Active)
N
Nikita Mikhaylov 已提交
1104 1105 1106 1107 1108 1109
                break;

            /// Repeat on errors
            std::this_thread::sleep_for(default_sleep_time);
        }

N
Nikita Mikhaylov 已提交
1110 1111
        was_active_pieces = (res == TaskStatus::Active);
        was_failed_pieces = (res == TaskStatus::Error);
N
Nikita Mikhaylov 已提交
1112
    }
N
Nikita Mikhaylov 已提交
1113

N
Nikita Mikhaylov 已提交
1114
    if (was_failed_pieces)
N
Nikita Mikhaylov 已提交
1115
        return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1116 1117

    if (was_active_pieces)
N
Nikita Mikhaylov 已提交
1118
        return TaskStatus::Active;
N
Nikita Mikhaylov 已提交
1119

N
Nikita Mikhaylov 已提交
1120
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1121 1122
}

N
Nikita Mikhaylov 已提交
1123

N
Nikita Mikhaylov 已提交
1124
TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
N
Nikita Mikhaylov 已提交
1125 1126
        const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
        const size_t current_piece_number, bool is_unprioritized_task)
N
Nikita Mikhaylov 已提交
1127 1128 1129
{
    TaskShard & task_shard = task_partition.task_shard;
    TaskTable & task_table = task_shard.task_table;
N
Nikita Mikhaylov 已提交
1130
    ClusterPartition & cluster_partition  = task_table.getClusterPartition(task_partition.name);
N
Nikita Mikhaylov 已提交
1131 1132 1133 1134
    ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];

    const size_t number_of_splits = task_table.number_of_splits;
    const String primary_key_comma_separated = task_table.primary_key_comma_separated;
1135

N
Nikita Mikhaylov 已提交
1136
    /// We need to update table definitions for each partition, it could be changed after ALTER
N
Nikita Mikhaylov 已提交
1137 1138 1139
    createShardInternalTables(timeouts, task_shard, true);

    auto split_table_for_current_piece = task_shard.list_of_split_tables_on_shard[current_piece_number];
1140

N
Nikita Mikhaylov 已提交
1141
    auto zookeeper = context.getZooKeeper();
1142

N
Nikita Mikhaylov 已提交
1143 1144
    const String piece_is_dirty_flag_path          = partition_piece.getPartitionPieceIsDirtyPath();
    const String piece_is_dirty_cleaned_path       = partition_piece.getPartitionPieceIsCleanedPath();
N
Nikita Mikhaylov 已提交
1145
    const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath();
N
Nikita Mikhaylov 已提交
1146
    const String current_task_piece_status_path    = partition_piece.getShardStatusPath();
1147

N
Nikita Mikhaylov 已提交
1148
    /// Auxiliary functions:
1149

N
Nikita Mikhaylov 已提交
1150
    /// Creates is_dirty node to initialize DROP PARTITION
N
Nikita Mikhaylov 已提交
1151
    auto create_is_dirty_node = [&] (const CleanStateClock & clock)
N
Nikita Mikhaylov 已提交
1152 1153
    {
        if (clock.is_stale())
A
Alexey Milovidov 已提交
1154
            LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
N
Nikita Mikhaylov 已提交
1155
        else if (!clock.is_clean())
A
Alexey Milovidov 已提交
1156
            LOG_DEBUG(log, "Thank you, Captain Obvious");
N
Nikita Mikhaylov 已提交
1157 1158
        else if (clock.discovery_version)
        {
A
Alexey Milovidov 已提交
1159
            LOG_DEBUG(log, "Updating clean state clock");
N
Nikita Mikhaylov 已提交
1160
            zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value());
N
Nikita Mikhaylov 已提交
1161 1162
        }
        else
1163
        {
A
Alexey Milovidov 已提交
1164
            LOG_DEBUG(log, "Creating clean state clock");
N
Nikita Mikhaylov 已提交
1165
            zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
1166
        }
N
Nikita Mikhaylov 已提交
1167
    };
1168

N
Nikita Mikhaylov 已提交
1169
    /// Returns SELECT query filtering current partition and applying user filter
N
Nikita Mikhaylov 已提交
1170
    auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, bool enable_splitting, String limit = "")
1171
    {
N
Nikita Mikhaylov 已提交
1172 1173
        String query;
        query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
N
Nikita Mikhaylov 已提交
1174

A
alexey-milovidov 已提交
1175 1176
        if (enable_splitting && experimental_use_sample_offset)
            query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
N
Nikita Mikhaylov 已提交
1177

N
Nikita Mikhaylov 已提交
1178 1179
        /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
        query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
N
Nikita Mikhaylov 已提交
1180

A
alexey-milovidov 已提交
1181 1182
        if (enable_splitting && !experimental_use_sample_offset)
            query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + " = " + toString(current_piece_number) + " )";
N
Nikita Mikhaylov 已提交
1183

N
Nikita Mikhaylov 已提交
1184 1185
        if (!task_table.where_condition_str.empty())
            query += " AND (" + task_table.where_condition_str + ")";
N
Nikita Mikhaylov 已提交
1186

N
Nikita Mikhaylov 已提交
1187 1188
        if (!limit.empty())
            query += " LIMIT " + limit;
1189

N
Nikita Mikhaylov 已提交
1190
        ParserQuery p_query(query.data() + query.size());
1191 1192 1193

        const auto & settings = context.getSettingsRef();
        return parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1194
    };
1195

N
Nikita Mikhaylov 已提交
1196
    /// Load balancing
N
Nikita Mikhaylov 已提交
1197
    auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task);
1198

A
Alexey Milovidov 已提交
1199
    LOG_DEBUG(log, "Processing {}", current_task_piece_status_path);
1200

N
Nikita Mikhaylov 已提交
1201
    const String piece_status_path = partition_piece.getPartitionPieceShardsPath();
1202

N
Nikita Mikhaylov 已提交
1203 1204 1205
    CleanStateClock clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);

    const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_status_path);
1206

N
Nikita Mikhaylov 已提交
1207
    /// Do not start if partition piece is dirty, try to clean it
N
Nikita Mikhaylov 已提交
1208
    if (is_clean)
1209
    {
A
Alexey Milovidov 已提交
1210
        LOG_DEBUG(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
N
Nikita Mikhaylov 已提交
1211
        zookeeper->createAncestors(current_task_piece_status_path);
1212
    }
N
Nikita Mikhaylov 已提交
1213
    else
1214
    {
A
Alexey Milovidov 已提交
1215
        LOG_DEBUG(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
1216

N
Nikita Mikhaylov 已提交
1217 1218
        try
        {
N
Nikita Mikhaylov 已提交
1219
            tryDropPartitionPiece(task_partition, current_piece_number, zookeeper, clean_state_clock);
N
Nikita Mikhaylov 已提交
1220 1221 1222 1223 1224
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occurred when clean partition");
        }
1225

N
Nikita Mikhaylov 已提交
1226
        return TaskStatus::Error;
1227 1228
    }

N
Nikita Mikhaylov 已提交
1229
    /// Create ephemeral node to mark that we are active and process the partition
N
Nikita Mikhaylov 已提交
1230
    zookeeper->createAncestors(current_task_piece_is_active_path);
N
Nikita Mikhaylov 已提交
1231 1232
    zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
    try
1233
    {
N
Nikita Mikhaylov 已提交
1234
        partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_piece_is_active_path, *zookeeper, host_id);
N
Nikita Mikhaylov 已提交
1235 1236 1237
    }
    catch (const Coordination::Exception & e)
    {
1238
        if (e.code == Coordination::Error::ZNODEEXISTS)
1239
        {
A
Alexey Milovidov 已提交
1240
            LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path);
N
Nikita Mikhaylov 已提交
1241
            return TaskStatus::Active;
1242 1243
        }

N
Nikita Mikhaylov 已提交
1244 1245
        throw;
    }
1246

N
Nikita Mikhaylov 已提交
1247 1248 1249 1250
    /// Exit if task has been already processed;
    /// create blocking node to signal cleaning up if it is abandoned
    {
        String status_data;
N
Nikita Mikhaylov 已提交
1251
        if (zookeeper->tryGet(current_task_piece_status_path, status_data))
1252
        {
N
Nikita Mikhaylov 已提交
1253 1254
            TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
            if (status.state == TaskState::Finished)
1255
            {
A
Alexey Milovidov 已提交
1256
                LOG_DEBUG(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
N
Nikita Mikhaylov 已提交
1257
                return TaskStatus::Finished;
1258 1259
            }

N
Nikita Mikhaylov 已提交
1260 1261
            /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
            /// Initialize DROP PARTITION
A
Alexey Milovidov 已提交
1262
            LOG_DEBUG(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
1263

N
Nikita Mikhaylov 已提交
1264
            create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1265
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1266
        }
1267 1268
    }

N
Nikita Mikhaylov 已提交
1269 1270 1271 1272 1273 1274 1275

    /// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
    /// whether each shard have processed each partitition (and its pieces).
    if (partition_piece.is_absent_piece)
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
        auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
1276
        if (res == Coordination::Error::ZNODEEXISTS)
A
Alexey Milovidov 已提交
1277
            LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
1278
        if (res == Coordination::Error::ZOK)
A
Alexey Milovidov 已提交
1279
            LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
N
Nikita Mikhaylov 已提交
1280
        return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1281 1282
    }

N
Nikita Mikhaylov 已提交
1283 1284 1285
    /// Check that destination partition is empty if we are first worker
    /// NOTE: this check is incorrect if pull and push tables have different partition key!
    String clean_start_status;
N
Nikita Mikhaylov 已提交
1286
    if (!zookeeper->tryGet(partition_piece.getPartitionPieceCleanStartPath(), clean_start_status) || clean_start_status != "ok")
1287
    {
N
Nikita Mikhaylov 已提交
1288 1289
        zookeeper->createIfNotExists(partition_piece.getPartitionPieceCleanStartPath(), "");
        auto checker = zkutil::EphemeralNodeHolder::create(partition_piece.getPartitionPieceCleanStartPath() + "/checker",
N
Nikita Mikhaylov 已提交
1290
                                                           *zookeeper, host_id);
N
Nikita Mikhaylov 已提交
1291
        // Maybe we are the first worker
N
Nikita Mikhaylov 已提交
1292 1293

        ASTPtr query_select_ast = get_select_query(split_table_for_current_piece, "count()", /*enable_splitting*/ true);
N
Nikita Mikhaylov 已提交
1294 1295 1296 1297
        UInt64 count;
        {
            Context local_context = context;
            // Use pull (i.e. readonly) settings, but fetch data from destination servers
1298 1299
            local_context.setSettings(task_cluster->settings_pull);
            local_context.setSetting("skip_unavailable_shards", true);
N
Nikita Mikhaylov 已提交
1300

N
Nikolai Kochetov 已提交
1301
            Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().getInputStream());
N
Nikita Mikhaylov 已提交
1302 1303
            count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
        }
1304

N
Nikita Mikhaylov 已提交
1305 1306
        if (count != 0)
        {
A
Alexey Milovidov 已提交
1307
            LOG_INFO(log, "Partition {} piece {}is not empty. In contains {} rows.", task_partition.name, current_piece_number, count);
N
Nikita Mikhaylov 已提交
1308
            Coordination::Stat stat_shards{};
N
Nikita Mikhaylov 已提交
1309
            zookeeper->get(partition_piece.getPartitionPieceShardsPath(), &stat_shards);
1310

N
Nikita Mikhaylov 已提交
1311 1312 1313
            /// NOTE: partition is still fresh if dirt discovery happens before cleaning
            if (stat_shards.numChildren == 0)
            {
A
Alexey Milovidov 已提交
1314
                LOG_WARNING(log, "There are no workers for partition {} piece {}, but destination table contains {} rows. Partition will be dropped and refilled.", task_partition.name, toString(current_piece_number), count);
1315

N
Nikita Mikhaylov 已提交
1316
                create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1317
                return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1318 1319
            }
        }
N
Nikita Mikhaylov 已提交
1320
        zookeeper->set(partition_piece.getPartitionPieceCleanStartPath(), "ok");
N
Nikita Mikhaylov 已提交
1321 1322 1323
    }
    /// At this point, we need to sync that the destination table is clean
    /// before any actual work
1324

N
Nikita Mikhaylov 已提交
1325 1326 1327
    /// Try start processing, create node about it
    {
        String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
A
Alexey Milovidov 已提交
1328
        CleanStateClock new_clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
N
Nikita Mikhaylov 已提交
1329 1330
        if (clean_state_clock != new_clean_state_clock)
        {
A
Alexey Milovidov 已提交
1331
            LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1332
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1333 1334 1335
        }
        else if (!new_clean_state_clock.is_clean())
        {
A
Alexey Milovidov 已提交
1336
            LOG_INFO(log, "Partition {} piece {} is dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1337
            create_is_dirty_node(new_clean_state_clock);
N
Nikita Mikhaylov 已提交
1338
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1339
        }
N
Nikita Mikhaylov 已提交
1340
        zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent);
1341 1342
    }

N
Nikita Mikhaylov 已提交
1343
    /// Try create table (if not exists) on each shard
1344
    {
N
Nikita Mikhaylov 已提交
1345 1346 1347
        /// Define push table for current partition piece
        auto database_and_table_for_current_piece= std::pair<String, String>(
                task_table.table_push.first,
N
Nikita Mikhaylov 已提交
1348
                task_table.table_push.second + "_piece_" + toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1349

N
Nikita Mikhaylov 已提交
1350 1351 1352
        auto new_engine_push_ast = task_table.engine_push_ast;
        if (task_table.isReplicatedTable())
        {
1353
            new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
N
Nikita Mikhaylov 已提交
1354 1355 1356 1357 1358 1359
        }

        auto create_query_push_ast = rewriteCreateQueryStorage(
                task_shard.current_pull_table_create_query,
                database_and_table_for_current_piece, new_engine_push_ast);

N
Nikita Mikhaylov 已提交
1360 1361 1362
        create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
        String query = queryToString(create_query_push_ast);

A
Alexey Milovidov 已提交
1363
        LOG_DEBUG(log, "Create destination tables. Query: {}", query);
A
Alexey Milovidov 已提交
1364
        UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
A
Alexey Milovidov 已提交
1365 1366
        LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
            getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
N
Nikita Mikhaylov 已提交
1367
    }
1368

N
Nikita Mikhaylov 已提交
1369 1370 1371 1372
    /// Do the copying
    {
        bool inject_fault = false;
        if (copy_fault_probability > 0)
1373
        {
N
Nikita Mikhaylov 已提交
1374 1375
            double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
            inject_fault = value < copy_fault_probability;
1376 1377
        }

N
Nikita Mikhaylov 已提交
1378
        // Select all fields
N
Nikita Mikhaylov 已提交
1379
        ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
1380

A
Alexey Milovidov 已提交
1381
        LOG_DEBUG(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
N
Nikita Mikhaylov 已提交
1382 1383

        ASTPtr query_insert_ast;
1384
        {
N
Nikita Mikhaylov 已提交
1385
            String query;
N
Nikita Mikhaylov 已提交
1386
            query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
1387

N
Nikita Mikhaylov 已提交
1388
            ParserQuery p_query(query.data() + query.size());
1389 1390
            const auto & settings = context.getSettingsRef();
            query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1391

A
Alexey Milovidov 已提交
1392
            LOG_DEBUG(log, "Executing INSERT query: {}", query);
N
Nikita Mikhaylov 已提交
1393 1394 1395 1396
        }

        try
        {
A
alexey-milovidov 已提交
1397 1398 1399 1400 1401 1402
            std::unique_ptr<Context> context_select = std::make_unique<Context>(context);
            context_select->setSettings(task_cluster->settings_pull);

            std::unique_ptr<Context> context_insert = std::make_unique<Context>(context);
            context_insert->setSettings(task_cluster->settings_push);

N
Nikita Mikhaylov 已提交
1403 1404 1405
            /// Custom INSERT SELECT implementation
            BlockInputStreamPtr input;
            BlockOutputStreamPtr output;
1406
            {
A
Alexey Milovidov 已提交
1407 1408
                BlockIO io_select = InterpreterFactory::get(query_select_ast, *context_select)->execute();
                BlockIO io_insert = InterpreterFactory::get(query_insert_ast, *context_insert)->execute();
1409

N
Nikolai Kochetov 已提交
1410
                input = io_select.getInputStream();
N
Nikita Mikhaylov 已提交
1411
                output = io_insert.out;
1412 1413
            }

N
Nikita Mikhaylov 已提交
1414 1415 1416 1417 1418 1419 1420 1421
            /// Fail-fast optimization to abort copying when the current clean state expires
            std::future<Coordination::ExistsResponse> future_is_dirty_checker;

            Stopwatch watch(CLOCK_MONOTONIC_COARSE);
            constexpr UInt64 check_period_milliseconds = 500;

            /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
            auto cancel_check = [&] ()
1422
            {
N
Nikita Mikhaylov 已提交
1423 1424
                if (zookeeper->expired())
                    throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1425

N
Nikita Mikhaylov 已提交
1426
                if (!future_is_dirty_checker.valid())
N
Nikita Mikhaylov 已提交
1427
                    future_is_dirty_checker = zookeeper->asyncExists(piece_is_dirty_flag_path);
1428

N
Nikita Mikhaylov 已提交
1429 1430 1431
                /// check_period_milliseconds should less than average insert time of single block
                /// Otherwise, the insertion will slow a little bit
                if (watch.elapsedMilliseconds() >= check_period_milliseconds)
1432
                {
N
Nikita Mikhaylov 已提交
1433
                    Coordination::ExistsResponse status = future_is_dirty_checker.get();
1434

1435
                    if (status.error != Coordination::Error::ZNONODE)
1436
                    {
N
Nikita Mikhaylov 已提交
1437 1438 1439 1440
                        LogicalClock dirt_discovery_epoch (status.stat.mzxid);
                        if (dirt_discovery_epoch == clean_state_clock.discovery_zxid)
                            return false;
                        throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1441 1442 1443
                    }
                }

N
Nikita Mikhaylov 已提交
1444 1445
                return false;
            };
1446

N
Nikita Mikhaylov 已提交
1447 1448 1449 1450 1451 1452 1453 1454
            /// Update statistics
            /// It is quite rough: bytes_copied don't take into account DROP PARTITION.
            auto update_stats = [&cluster_partition] (const Block & block)
            {
                cluster_partition.bytes_copied += block.bytes();
                cluster_partition.rows_copied += block.rows();
                cluster_partition.blocks_copied += 1;
            };
1455

N
Nikita Mikhaylov 已提交
1456 1457
            /// Main work is here
            copyData(*input, *output, cancel_check, update_stats);
1458

N
Nikita Mikhaylov 已提交
1459 1460 1461
            // Just in case
            if (future_is_dirty_checker.valid())
                future_is_dirty_checker.get();
1462

N
Nikita Mikhaylov 已提交
1463 1464 1465 1466 1467 1468
            if (inject_fault)
                throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
N
better  
Nikita Mikhaylov 已提交
1469
            create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1470
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1471
        }
1472 1473
    }

A
Alexey Milovidov 已提交
1474
    LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1475 1476 1477


    /// Try create original table (if not exists) on each shard
N
Nikita Mikhaylov 已提交
1478
    try
N
Nikita Mikhaylov 已提交
1479 1480 1481
    {
        auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
                                                               task_table.table_push, task_table.engine_push_ast);
1482 1483 1484
        auto & create = create_query_push_ast->as<ASTCreateQuery &>();
        create.if_not_exists = true;
        InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
N
Nikita Mikhaylov 已提交
1485 1486
        String query = queryToString(create_query_push_ast);

A
Alexey Milovidov 已提交
1487
        LOG_DEBUG(log, "Create destination tables. Query: {}", query);
A
Alexey Milovidov 已提交
1488
        UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
A
Alexey Milovidov 已提交
1489
        LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
N
Nikita Mikhaylov 已提交
1490
    }
N
Nikita Mikhaylov 已提交
1491 1492 1493 1494
    catch (...)
    {
        tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
    }
N
Nikita Mikhaylov 已提交
1495

N
Nikita Mikhaylov 已提交
1496 1497 1498
    /// Finalize the processing, change state of current partition task (and also check is_dirty flag)
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
N
Nikita Mikhaylov 已提交
1499
        CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
N
Nikita Mikhaylov 已提交
1500 1501
        if (clean_state_clock != new_clean_state_clock)
        {
A
Alexey Milovidov 已提交
1502
            LOG_INFO(log, "Partition {} piece {} clean state changed, cowardly bailing", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1503
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1504 1505 1506
        }
        else if (!new_clean_state_clock.is_clean())
        {
A
Alexey Milovidov 已提交
1507
            LOG_INFO(log, "Partition {} piece {} became dirty and will be dropped and refilled", task_partition.name, toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1508
            create_is_dirty_node(new_clean_state_clock);
N
Nikita Mikhaylov 已提交
1509
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1510
        }
N
Nikita Mikhaylov 已提交
1511
        zookeeper->set(current_task_piece_status_path, state_finished, 0);
N
Nikita Mikhaylov 已提交
1512
    }
1513

N
Nikita Mikhaylov 已提交
1514
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1515
}
1516

N
Nikita Mikhaylov 已提交
1517 1518 1519 1520
void ClusterCopier::dropAndCreateLocalTable(const ASTPtr & create_ast)
{
    const auto & create = create_ast->as<ASTCreateQuery &>();
    dropLocalTableIfExists({create.database, create.table});
1521

N
Nikita Mikhaylov 已提交
1522 1523 1524
    InterpreterCreateQuery interpreter(create_ast, context);
    interpreter.execute();
}
1525

N
Nikita Mikhaylov 已提交
1526 1527 1528 1529 1530 1531
void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_name) const
{
    auto drop_ast = std::make_shared<ASTDropQuery>();
    drop_ast->if_exists = true;
    drop_ast->database = table_name.first;
    drop_ast->table = table_name.second;
1532

N
Nikita Mikhaylov 已提交
1533 1534 1535
    InterpreterDropQuery interpreter(drop_ast, context);
    interpreter.execute();
}
1536

N
Nikita Mikhaylov 已提交
1537 1538 1539

void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
A
Alexey Milovidov 已提交
1540
    LOG_DEBUG(log, "Removing helping tables");
N
Nikita Mikhaylov 已提交
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

        String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);

        const ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;

A
Alexey Milovidov 已提交
1551
        LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
N
Nikita Mikhaylov 已提交
1552 1553 1554
        /// We have to drop partition_piece on each replica
        UInt64 num_nodes = executeQueryOnCluster(
                cluster_push, query,
1555
                settings_push,
N
Nikita Mikhaylov 已提交
1556 1557 1558
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);

A
Alexey Milovidov 已提交
1559
        LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
N
Nikita Mikhaylov 已提交
1560 1561 1562
    }
}

N
Nikita Mikhaylov 已提交
1563 1564 1565

void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
A
Alexey Milovidov 已提交
1566
    LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
N
Nikita Mikhaylov 已提交
1567 1568 1569 1570 1571
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

1572
        String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name;
N
Nikita Mikhaylov 已提交
1573 1574 1575 1576

        const ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;

A
Alexey Milovidov 已提交
1577
        LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
N
Nikita Mikhaylov 已提交
1578 1579 1580
        /// We have to drop partition_piece on each replica
        UInt64 num_nodes = executeQueryOnCluster(
                cluster_push, query,
1581
                settings_push,
N
Nikita Mikhaylov 已提交
1582 1583 1584
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);

A
Alexey Milovidov 已提交
1585
        LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
N
Nikita Mikhaylov 已提交
1586
    }
A
Alexey Milovidov 已提交
1587
    LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
N
Nikita Mikhaylov 已提交
1588 1589
}

N
Nikita Mikhaylov 已提交
1590 1591 1592 1593
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings)
{
    String query = "SHOW CREATE TABLE " + getQuotedTable(table);
    Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
N
Nikita Mikhaylov 已提交
1594
            connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
1595

N
Nikita Mikhaylov 已提交
1596 1597
    return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
1598

N
Nikita Mikhaylov 已提交
1599 1600 1601
ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
    /// Fetch and parse (possibly) new definition
1602
    auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
N
Nikita Mikhaylov 已提交
1603
    String create_query_pull_str = getRemoteCreateTable(
N
Nikita Mikhaylov 已提交
1604 1605 1606
            task_shard.task_table.table_pull,
            *connection_entry,
            &task_cluster->settings_pull);
N
Nikita Mikhaylov 已提交
1607 1608

    ParserCreateQuery parser_create_query;
1609 1610
    const auto & settings = context.getSettingsRef();
    return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1611
}
1612

N
Nikita Mikhaylov 已提交
1613
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
N
Nikita Mikhaylov 已提交
1614
void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
N
Nikita Mikhaylov 已提交
1615
        TaskShard & task_shard, bool create_split)
1616
{
N
Nikita Mikhaylov 已提交
1617
    TaskTable & task_table = task_shard.task_table;
1618

N
Nikita Mikhaylov 已提交
1619 1620
    /// We need to update table definitions for each part, it could be changed after ALTER
    task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
1621

N
Nikita Mikhaylov 已提交
1622 1623 1624 1625 1626
    /// Create local Distributed tables:
    ///  a table fetching data from current shard and a table inserting data to the whole destination cluster
    String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + ".";
    String split_shard_prefix = ".split.";
    task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
N
Nikita Mikhaylov 已提交
1627 1628 1629 1630 1631 1632 1633
    task_shard.main_table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id);

    for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
    {
        task_shard.list_of_split_tables_on_shard[piece_number] =
                DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id + "_piece_" + toString(piece_number));
    }
1634

N
Nikita Mikhaylov 已提交
1635 1636 1637 1638
    /// Create special cluster with single shard
    String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
    ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
    context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
1639

N
Nikita Mikhaylov 已提交
1640
    auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
1641

N
Nikita Mikhaylov 已提交
1642
    auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
1643

N
Nikita Mikhaylov 已提交
1644
    auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
N
Nikita Mikhaylov 已提交
1645
    dropAndCreateLocalTable(create_table_pull_ast);
1646

N
Nikita Mikhaylov 已提交
1647
    if (create_split)
N
Nikita Mikhaylov 已提交
1648 1649 1650 1651 1652 1653
    {
        auto create_table_split_piece_ast = rewriteCreateQueryStorage(
                create_query_ast,
                task_shard.main_table_split_shard,
                task_table.main_engine_split_ast);

N
Nikita Mikhaylov 已提交
1654
        dropAndCreateLocalTable(create_table_split_piece_ast);
N
Nikita Mikhaylov 已提交
1655

A
Alexey Milovidov 已提交
1656
        /// Create auxiliary split tables for each piece
N
Nikita Mikhaylov 已提交
1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669
        for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
        {
            const auto & storage_piece_split_ast = task_table.auxiliary_engine_split_asts[piece_number];

            create_table_split_piece_ast = rewriteCreateQueryStorage(
                    create_query_ast,
                    task_shard.list_of_split_tables_on_shard[piece_number],
                    storage_piece_split_ast);

            dropAndCreateLocalTable(create_table_split_piece_ast);
        }
    }

1670
}
1671 1672


N
Nikita Mikhaylov 已提交
1673
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
1674
{
1675 1676
    std::set<String> res;

N
Nikita Mikhaylov 已提交
1677
    createShardInternalTables(timeouts, task_shard, false);
1678

N
Nikita Mikhaylov 已提交
1679
    TaskTable & task_table = task_shard.task_table;
1680

1681 1682 1683 1684 1685 1686 1687 1688
    const String & partition_name = queryToString(task_table.engine_push_partition_key_ast);

    if (partition_name == "'all'")
    {
        res.emplace("'all'");
        return res;
    }

N
Nikita Mikhaylov 已提交
1689 1690 1691
    String query;
    {
        WriteBufferFromOwnString wb;
1692
        wb << "SELECT DISTINCT " << partition_name << " AS partition FROM"
N
Nikita Mikhaylov 已提交
1693 1694 1695
           << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
        query = wb.str();
    }
1696

N
Nikita Mikhaylov 已提交
1697
    ParserQuery parser_query(query.data() + query.size());
1698 1699
    const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1700

A
Alexey Milovidov 已提交
1701
    LOG_DEBUG(log, "Computing destination partition set, executing query: {}", query);
N
Nikita Mikhaylov 已提交
1702 1703 1704

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
N
Nikolai Kochetov 已提交
1705
    Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
N
Nikita Mikhaylov 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719

    if (block)
    {
        ColumnWithTypeAndName & column = block.getByPosition(0);
        task_shard.partition_key_column = column;

        for (size_t i = 0; i < column.column->size(); ++i)
        {
            WriteBufferFromOwnString wb;
            column.type->serializeAsTextQuoted(*column.column, i, wb, FormatSettings());
            res.emplace(wb.str());
        }
    }

A
Alexey Milovidov 已提交
1720
    LOG_DEBUG(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
1721

N
Nikita Mikhaylov 已提交
1722 1723
    return res;
}
1724

N
Nikita Mikhaylov 已提交
1725 1726
bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
        TaskShard & task_shard, const String & partition_quoted_name)
1727
{
N
Nikita Mikhaylov 已提交
1728 1729 1730 1731 1732
    createShardInternalTables(timeouts, task_shard, false);

    TaskTable & task_table = task_shard.task_table;

    std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
N
Nikita Mikhaylov 已提交
1733 1734
                        + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) +
                        " = (" + partition_quoted_name + " AS partition_key))";
N
Nikita Mikhaylov 已提交
1735 1736 1737 1738 1739 1740

    if (!task_table.where_condition_str.empty())
        query += " AND (" + task_table.where_condition_str + ")";

    query += " LIMIT 1";

A
Alexey Milovidov 已提交
1741
    LOG_DEBUG(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
N
Nikita Mikhaylov 已提交
1742 1743

    ParserQuery parser_query(query.data() + query.size());
1744 1745
const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1746 1747 1748

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
N
Nikolai Kochetov 已提交
1749
    return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
1750
}
1751

N
Nikita Mikhaylov 已提交
1752
bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
N
Nikita Mikhaylov 已提交
1753
                           TaskShard & task_shard, const String & partition_quoted_name, size_t current_piece_number)
N
Nikita Mikhaylov 已提交
1754 1755 1756 1757 1758 1759 1760
{
    createShardInternalTables(timeouts, task_shard, false);

    TaskTable & task_table = task_shard.task_table;
    const size_t number_of_splits = task_table.number_of_splits;
    const String & primary_key_comma_separated = task_table.primary_key_comma_separated;

N
Nikita Mikhaylov 已提交
1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773
    UNUSED(primary_key_comma_separated);

    std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard);

    if (experimental_use_sample_offset)
        query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);

    query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast)
                        + " = (" + partition_quoted_name + " AS partition_key))";

    if (!experimental_use_sample_offset)
        query += " AND (cityHash64(" + primary_key_comma_separated + ") % "
                 + std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )";
N
Nikita Mikhaylov 已提交
1774 1775 1776 1777 1778 1779

    if (!task_table.where_condition_str.empty())
        query += " AND (" + task_table.where_condition_str + ")";

    query += " LIMIT 1";

A
Alexey Milovidov 已提交
1780
    LOG_DEBUG(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
N
Nikita Mikhaylov 已提交
1781 1782

    ParserQuery parser_query(query.data() + query.size());
1783 1784
    const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1785 1786 1787

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
N
Nikolai Kochetov 已提交
1788
    auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
N
Nikita Mikhaylov 已提交
1789
    if (result != 0)
A
Alexey Milovidov 已提交
1790
        LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
N
Nikita Mikhaylov 已提交
1791
    else
A
Alexey Milovidov 已提交
1792
        LOG_DEBUG(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
N
Nikita Mikhaylov 已提交
1793 1794
    return result != 0;
}
1795

N
Nikita Mikhaylov 已提交
1796 1797 1798
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
  * Returns number of shards for which at least one replica executed query successfully
  */
N
Nikita Mikhaylov 已提交
1799
UInt64 ClusterCopier::executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
1800 1801
        const ClusterPtr & cluster,
        const String & query,
1802
        const Settings & current_settings,
N
Nikita Mikhaylov 已提交
1803
        PoolMode pool_mode,
N
Nikita Mikhaylov 已提交
1804
        ClusterExecutionMode execution_mode,
N
Nikita Mikhaylov 已提交
1805
        UInt64 max_successful_executions_per_shard) const
1806
{
N
Nikita Mikhaylov 已提交
1807 1808
    auto num_shards = cluster->getShardsInfo().size();
    std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
1809

A
Alexey Milovidov 已提交
1810 1811
    ParserQuery p_query(query.data() + query.size());
    ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1812

N
Nikita Mikhaylov 已提交
1813 1814 1815 1816 1817
    /// We will have to execute query on each replica of a shard.
    if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
        max_successful_executions_per_shard = 0;

    std::atomic<size_t> origin_replicas_number;
N
Nikita Mikhaylov 已提交
1818 1819

    /// We need to execute query on one replica at least
1820
    auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
1821
    {
1822 1823
        setThreadName("QueryForShard");

N
Nikita Mikhaylov 已提交
1824 1825 1826
        const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
        UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
        num_successful_executions = 0;
1827

N
Nikita Mikhaylov 已提交
1828
        auto increment_and_check_exit = [&] () -> bool
N
Nikita Mikhaylov 已提交
1829 1830 1831 1832
        {
            ++num_successful_executions;
            return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
        };
1833

N
Nikita Mikhaylov 已提交
1834
        UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
N
Nikita Mikhaylov 已提交
1835 1836

        origin_replicas_number += num_replicas;
N
Nikita Mikhaylov 已提交
1837 1838
        UInt64 num_local_replicas = shard.getLocalNodeCount();
        UInt64 num_remote_replicas = num_replicas - num_local_replicas;
1839

N
Nikita Mikhaylov 已提交
1840 1841 1842 1843 1844
        /// In that case we don't have local replicas, but do it just in case
        for (UInt64 i = 0; i < num_local_replicas; ++i)
        {
            auto interpreter = InterpreterFactory::get(query_ast, context);
            interpreter->execute();
1845

N
Nikita Mikhaylov 已提交
1846 1847 1848
            if (increment_and_check_exit())
                return;
        }
1849

N
Nikita Mikhaylov 已提交
1850 1851 1852
        /// Will try to make as many as possible queries
        if (shard.hasRemoteConnections())
        {
1853
            shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
N
Nikita Mikhaylov 已提交
1854

1855 1856
            auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
            auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
N
Nikita Mikhaylov 已提交
1857 1858 1859 1860 1861 1862 1863 1864 1865

            for (auto & connection : connections)
            {
                if (connection.isNull())
                    continue;

                try
                {
                    /// CREATE TABLE and DROP PARTITION queries return empty block
1866
                    RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings};
N
Nikita Mikhaylov 已提交
1867 1868 1869 1870 1871 1872 1873 1874
                    NullBlockOutputStream output{Block{}};
                    copyData(stream, output);

                    if (increment_and_check_exit())
                        return;
                }
                catch (const Exception &)
                {
A
Alexey Milovidov 已提交
1875
                    LOG_INFO(log, getCurrentExceptionMessage(false, true));
N
Nikita Mikhaylov 已提交
1876 1877 1878 1879
                }
            }
        }
    };
1880 1881

    {
N
Nikita Mikhaylov 已提交
1882 1883 1884
        ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));

        for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
1885
            thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
1886

N
Nikita Mikhaylov 已提交
1887
        thread_pool.wait();
1888
    }
N
Nikita Mikhaylov 已提交
1889

N
Nikita Mikhaylov 已提交
1890
    UInt64 successful_nodes = 0;
N
Nikita Mikhaylov 已提交
1891
    for (UInt64 num_replicas : per_shard_num_successful_replicas)
N
Nikita Mikhaylov 已提交
1892 1893 1894 1895 1896 1897 1898
    {
        if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
            successful_nodes += num_replicas;
        else
            /// Count only successful shards
            successful_nodes += (num_replicas > 0);
    }
N
Nikita Mikhaylov 已提交
1899

N
Nikita Mikhaylov 已提交
1900
    if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
N
Nikita Mikhaylov 已提交
1901
    {
A
Alexey Milovidov 已提交
1902
        LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
N
Nikita Mikhaylov 已提交
1903
    }
N
Nikita Mikhaylov 已提交
1904

N
Nikita Mikhaylov 已提交
1905
    return successful_nodes;
N
Nikita Mikhaylov 已提交
1906
}
1907

1908
}