ClusterCopier.cpp 80.8 KB
Newer Older
1
#include "ClusterCopier.h"
2

N
Nikita Mikhaylov 已提交
3 4
#include "Internals.h"

5
#include <Common/ZooKeeper/ZooKeeper.h>
A
Alexey Milovidov 已提交
6
#include <Common/ZooKeeper/KeeperException.h>
7 8

namespace DB
A
Alexey Milovidov 已提交
9
{
10

11 12 13 14 15 16 17
namespace ErrorCodes
{
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
    extern const int UNFINISHED;
    extern const int BAD_ARGUMENTS;
}
A
Alexey Milovidov 已提交
18

19

N
Nikita Mikhaylov 已提交
20
void ClusterCopier::init()
21
{
N
Nikita Mikhaylov 已提交
22
    auto zookeeper = context.getZooKeeper();
23

N
Nikita Mikhaylov 已提交
24 25 26 27
    task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
    {
        if (response.error != Coordination::ZOK)
            return;
N
Nikita Mikhaylov 已提交
28
        UInt64 version = ++task_description_version;
N
Nikita Mikhaylov 已提交
29 30
        LOG_DEBUG(log, "Task description should be updated, local version " << version);
    };
31

N
Nikita Mikhaylov 已提交
32 33
    task_description_path = task_zookeeper_path + "/description";
    task_cluster = std::make_unique<TaskCluster>(task_zookeeper_path, working_database_name);
34

N
Nikita Mikhaylov 已提交
35 36
    reloadTaskDescription();
    task_cluster_initial_config = task_cluster_current_config;
37

N
Nikita Mikhaylov 已提交
38 39 40 41 42 43
    task_cluster->loadTasks(*task_cluster_initial_config);
    context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);

    /// Set up shards and their priority
    task_cluster->random_engine.seed(task_cluster->random_device());
    for (auto & task_table : task_cluster->table_tasks)
44
    {
N
Nikita Mikhaylov 已提交
45 46 47
        task_table.cluster_pull = context.getCluster(task_table.cluster_pull_name);
        task_table.cluster_push = context.getCluster(task_table.cluster_push_name);
        task_table.initShards(task_cluster->random_engine);
48 49
    }

N
Nikita Mikhaylov 已提交
50 51 52 53 54 55
    LOG_DEBUG(log, "Will process " << task_cluster->table_tasks.size() << " table tasks");

    /// Do not initialize tables, will make deferred initialization in process()

    zookeeper->createAncestors(getWorkersPathVersion() + "/");
    zookeeper->createAncestors(getWorkersPath() + "/");
56 57
}

N
Nikita Mikhaylov 已提交
58 59
template <typename T>
decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
60
{
N
Nikita Mikhaylov 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    std::exception_ptr exception;

    for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
    {
        try
        {
            return func();
        }
        catch (...)
        {
            exception = std::current_exception();
            if (try_number < max_tries)
            {
                tryLogCurrentException(log, "Will retry");
                std::this_thread::sleep_for(default_sleep_time);
            }
        }
    }

    std::rethrow_exception(exception);
81 82 83
}


N
Nikita Mikhaylov 已提交
84
void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard)
85
{
N
Nikita Mikhaylov 已提交
86
    TaskTable & task_table = task_shard->task_table;
87

N
Nikita Mikhaylov 已提交
88
    LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription());
89

N
Nikita Mikhaylov 已提交
90 91 92 93
    auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); };
    auto existing_partitions_names = retry(get_partitions, 60);
    Strings filtered_partitions_names;
    Strings missing_partitions;
94

N
Nikita Mikhaylov 已提交
95 96
    /// Check that user specified correct partition names
    auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted)
97
    {
N
Nikita Mikhaylov 已提交
98 99
        MutableColumnPtr column_dummy = type->createColumn();
        ReadBufferFromString rb(partition_text_quoted);
100

N
Nikita Mikhaylov 已提交
101 102 103 104 105 106 107 108 109
        try
        {
            type->deserializeAsTextQuoted(*column_dummy, rb, FormatSettings());
        }
        catch (Exception & e)
        {
            throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
        }
    };
110

N
Nikita Mikhaylov 已提交
111
    if (task_table.has_enabled_partitions)
112
    {
N
Nikita Mikhaylov 已提交
113 114 115 116 117
        /// Process partition in order specified by <enabled_partitions/>
        for (const String & partition_name : task_table.enabled_partitions)
        {
            /// Check that user specified correct partition names
            check_partition_format(task_shard->partition_key_column.type, partition_name);
118

N
Nikita Mikhaylov 已提交
119
            auto it = existing_partitions_names.find(partition_name);
120

N
Nikita Mikhaylov 已提交
121 122 123 124 125 126 127 128 129
            /// Do not process partition if it is not in enabled_partitions list
            if (it == existing_partitions_names.end())
            {
                missing_partitions.emplace_back(partition_name);
                continue;
            }

            filtered_partitions_names.emplace_back(*it);
        }
130

N
Nikita Mikhaylov 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143
        for (const String & partition_name : existing_partitions_names)
        {
            if (!task_table.enabled_partitions_set.count(partition_name))
            {
                LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in "
                                            << "enabled_partitions of " << task_table.table_id);
            }
        }
    }
    else
    {
        for (const String & partition_name : existing_partitions_names)
            filtered_partitions_names.emplace_back(partition_name);
144 145
    }

N
Nikita Mikhaylov 已提交
146 147
    for (const String & partition_name : filtered_partitions_names)
    {
N
Nikita Mikhaylov 已提交
148 149
        const size_t number_of_splits = task_table.number_of_splits;
        task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name, number_of_splits));
N
Nikita Mikhaylov 已提交
150
        task_shard->checked_partitions.emplace(partition_name, true);
N
Nikita Mikhaylov 已提交
151 152 153 154 155 156 157 158 159

        auto shard_partition_it = task_shard->partition_tasks.find(partition_name);
        PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;

        for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
        {
            bool res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *task_shard, partition_name, piece_number);
            shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
        }
N
Nikita Mikhaylov 已提交
160
    }
161

N
Nikita Mikhaylov 已提交
162 163 164 165 166
    if (!missing_partitions.empty())
    {
        std::stringstream ss;
        for (const String & missing_partition : missing_partitions)
            ss << " " << missing_partition;
167

N
Nikita Mikhaylov 已提交
168 169 170
        LOG_WARNING(log, "There are no " << missing_partitions.size() << " partitions from enabled_partitions in shard "
                         << task_shard->getDescription() << " :" << ss.str());
    }
171

N
Nikita Mikhaylov 已提交
172 173
    LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription());
}
174

N
Nikita Mikhaylov 已提交
175
void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads)
176
{
N
Nikita Mikhaylov 已提交
177 178 179
    /// Fetch partitions list from a shard
    {
        ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
180

N
Nikita Mikhaylov 已提交
181 182
        for (const TaskShardPtr & task_shard : task_table.all_shards)
            thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
183

N
Nikita Mikhaylov 已提交
184 185 186 187
        LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
        thread_pool.wait();
    }
}
188

N
Nikita Mikhaylov 已提交
189
void ClusterCopier::uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
190
{
N
Nikita Mikhaylov 已提交
191
    auto local_task_description_path = task_path + "/description";
192

N
Nikita Mikhaylov 已提交
193
    String task_config_str;
194
    {
N
Nikita Mikhaylov 已提交
195 196
        ReadBufferFromFile in(task_file);
        readStringUntilEOF(task_config_str, in);
197
    }
N
Nikita Mikhaylov 已提交
198 199
    if (task_config_str.empty())
        return;
200

N
Nikita Mikhaylov 已提交
201
    auto zookeeper = context.getZooKeeper();
202

N
Nikita Mikhaylov 已提交
203 204 205 206
    zookeeper->createAncestors(local_task_description_path);
    auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
    if (code && force)
        zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
207

N
Nikita Mikhaylov 已提交
208 209
    LOG_DEBUG(log, "Task description " << ((code && !force) ? "not " : "") << "uploaded to " << local_task_description_path << " with result " << code << " ("<< zookeeper->error2string(code) << ")");
}
210

N
Nikita Mikhaylov 已提交
211 212 213 214
void ClusterCopier::reloadTaskDescription()
{
    auto zookeeper = context.getZooKeeper();
    task_description_watch_zookeeper = zookeeper;
215

N
Nikita Mikhaylov 已提交
216
    String task_config_str;
N
Nikita Mikhaylov 已提交
217
    Coordination::Stat stat{};
N
Nikita Mikhaylov 已提交
218
    int code;
219

N
Nikita Mikhaylov 已提交
220 221 222
    zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
    if (code)
        throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
223

N
Nikita Mikhaylov 已提交
224
    LOG_DEBUG(log, "Loading description, zxid=" << task_description_current_stat.czxid);
N
Nikita Mikhaylov 已提交
225
    auto config = getConfigurationFromXMLString(task_config_str);
226

N
Nikita Mikhaylov 已提交
227 228
    /// Setup settings
    task_cluster->reloadSettings(*config);
229
    context.setSettings(task_cluster->settings_common);
230

N
Nikita Mikhaylov 已提交
231
    task_cluster_current_config = config;
N
Nikita Mikhaylov 已提交
232
    task_description_current_stat = stat;
N
Nikita Mikhaylov 已提交
233
}
234

N
Nikita Mikhaylov 已提交
235
void ClusterCopier::updateConfigIfNeeded()
236
{
N
Nikita Mikhaylov 已提交
237 238 239
    UInt64 version_to_update = task_description_version;
    bool is_outdated_version = task_description_current_version != version_to_update;
    bool is_expired_session  = !task_description_watch_zookeeper || task_description_watch_zookeeper->expired();
240

N
Nikita Mikhaylov 已提交
241 242
    if (!is_outdated_version && !is_expired_session)
        return;
243

N
Nikita Mikhaylov 已提交
244 245
    LOG_DEBUG(log, "Updating task description");
    reloadTaskDescription();
246

N
Nikita Mikhaylov 已提交
247
    task_description_current_version = version_to_update;
N
Nikita Mikhaylov 已提交
248
}
249

N
Nikita Mikhaylov 已提交
250 251 252 253 254
void ClusterCopier::process(const ConnectionTimeouts & timeouts)
{
    for (TaskTable & task_table : task_cluster->table_tasks)
    {
        LOG_INFO(log, "Process table task " << task_table.table_id << " with "
N
Nikita Mikhaylov 已提交
255 256
                                            << task_table.all_shards.size() << " shards, "
                                            << task_table.local_shards.size() << " of them are local ones");
257

N
Nikita Mikhaylov 已提交
258 259
        if (task_table.all_shards.empty())
            continue;
260

N
Nikita Mikhaylov 已提交
261 262 263 264 265
        /// Discover partitions of each shard and total set of partitions
        if (!task_table.has_enabled_partitions)
        {
            /// If there are no specified enabled_partitions, we must discover them manually
            discoverTablePartitions(timeouts, task_table);
266

N
Nikita Mikhaylov 已提交
267 268 269 270 271 272 273 274 275
            /// After partitions of each shard are initialized, initialize cluster partitions
            for (const TaskShardPtr & task_shard : task_table.all_shards)
            {
                for (const auto & partition_elem : task_shard->partition_tasks)
                {
                    const String & partition_name = partition_elem.first;
                    task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
                }
            }
276

N
Nikita Mikhaylov 已提交
277 278 279
            for (auto & partition_elem : task_table.cluster_partitions)
            {
                const String & partition_name = partition_elem.first;
280

N
Nikita Mikhaylov 已提交
281 282
                for (const TaskShardPtr & task_shard : task_table.all_shards)
                    task_shard->checked_partitions.emplace(partition_name);
283

N
Nikita Mikhaylov 已提交
284 285 286 287 288 289 290
                task_table.ordered_partition_names.emplace_back(partition_name);
            }
        }
        else
        {
            /// If enabled_partitions are specified, assume that each shard has all partitions
            /// We will refine partition set of each shard in future
291

N
Nikita Mikhaylov 已提交
292 293 294 295 296 297
            for (const String & partition_name : task_table.enabled_partitions)
            {
                task_table.cluster_partitions.emplace(partition_name, ClusterPartition{});
                task_table.ordered_partition_names.emplace_back(partition_name);
            }
        }
298

N
Nikita Mikhaylov 已提交
299
        task_table.watch.restart();
300

N
Nikita Mikhaylov 已提交
301 302 303 304 305 306 307 308 309 310
        /// Retry table processing
        bool table_is_done = false;
        for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
        {
            if (tryProcessTable(timeouts, task_table))
            {
                table_is_done = true;
                break;
            }
        }
311

N
Nikita Mikhaylov 已提交
312 313 314
        /// Delete helping tables in both cases (whole table is done or not)
        dropHelpingTables(task_table);

N
Nikita Mikhaylov 已提交
315 316 317 318 319
        if (!table_is_done)
        {
            throw Exception("Too many tries to process table " + task_table.table_id + ". Abort remaining execution",
                            ErrorCodes::UNFINISHED);
        }
320
    }
N
Nikita Mikhaylov 已提交
321
}
322

N
Nikita Mikhaylov 已提交
323
/// Protected section
324

N
Nikita Mikhaylov 已提交
325 326 327 328 329 330 331

/*
 * Creates task worker node and checks maximum number of workers not to exceed the limit.
 * To achive this we have to check version of workers_version_path node and create current_worker_path
 * node atomically.
 * */

N
Nikita Mikhaylov 已提交
332 333 334 335
zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNeed(
    const zkutil::ZooKeeperPtr & zookeeper,
    const String & description,
    bool unprioritized)
336
{
N
Nikita Mikhaylov 已提交
337 338
    std::chrono::milliseconds current_sleep_time = default_sleep_time;
    static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
339

N
Nikita Mikhaylov 已提交
340 341
    if (unprioritized)
        std::this_thread::sleep_for(current_sleep_time);
342

N
Nikita Mikhaylov 已提交
343
    String workers_version_path = getWorkersPathVersion();
N
Nikita Mikhaylov 已提交
344 345
    String workers_path         = getWorkersPath();
    String current_worker_path  = getCurrentWorkerNodePath();
346

N
Nikita Mikhaylov 已提交
347
    UInt64 num_bad_version_errors = 0;
348

N
Nikita Mikhaylov 已提交
349 350 351
    while (true)
    {
        updateConfigIfNeeded();
352

N
Nikita Mikhaylov 已提交
353 354 355 356
        Coordination::Stat stat;
        zookeeper->get(workers_version_path, &stat);
        auto version = stat.version;
        zookeeper->get(workers_path, &stat);
357

N
Nikita Mikhaylov 已提交
358 359 360 361
        if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
        {
            LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
                << ". Postpone processing " << description);
362

N
Nikita Mikhaylov 已提交
363 364
            if (unprioritized)
                current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
365

N
Nikita Mikhaylov 已提交
366 367 368 369 370 371 372 373 374 375
            std::this_thread::sleep_for(current_sleep_time);
            num_bad_version_errors = 0;
        }
        else
        {
            Coordination::Requests ops;
            ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
            ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
            Coordination::Responses responses;
            auto code = zookeeper->tryMulti(ops, responses);
376

N
Nikita Mikhaylov 已提交
377 378
            if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS)
                return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
379

N
Nikita Mikhaylov 已提交
380 381 382
            if (code == Coordination::ZBADVERSION)
            {
                ++num_bad_version_errors;
383

N
Nikita Mikhaylov 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396
                /// Try to make fast retries
                if (num_bad_version_errors > 3)
                {
                    LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
                    std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution<int>(1, 1000)(task_cluster->random_engine));
                    std::this_thread::sleep_for(random_sleep_time);
                    num_bad_version_errors = 0;
                }
            }
            else
                throw Coordination::Exception(code);
        }
    }
397 398
}

N
Nikita Mikhaylov 已提交
399

N
Nikita Mikhaylov 已提交
400 401 402
bool ClusterCopier::checkPartitionPieceIsClean(
        const zkutil::ZooKeeperPtr & zookeeper,
        const CleanStateClock & clean_state_clock,
N
Nikita Mikhaylov 已提交
403
        const String & task_status_path)
404
{
N
Nikita Mikhaylov 已提交
405
    LogicalClock task_start_clock;
N
Nikita Mikhaylov 已提交
406

N
Nikita Mikhaylov 已提交
407 408 409
    Coordination::Stat stat{};
    if (zookeeper->exists(task_status_path, &stat))
        task_start_clock = LogicalClock(stat.mzxid);
N
Nikita Mikhaylov 已提交
410

N
Nikita Mikhaylov 已提交
411
    return clean_state_clock.is_clean() && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock);
N
Nikita Mikhaylov 已提交
412 413
}

N
Nikita Mikhaylov 已提交
414 415

bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition)
N
Nikita Mikhaylov 已提交
416 417
{
    bool answer = true;
N
Nikita Mikhaylov 已提交
418 419 420 421 422 423 424 425
    for (size_t piece_number = 0; piece_number < task_table.number_of_splits; ++piece_number)
    {
        bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
        if (!piece_is_done)
            LOG_DEBUG(log, "Partition " << partition_name << " piece " + toString(piece_number) + " is not already done.");
        answer &= piece_is_done;
    }

N
Nikita Mikhaylov 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438
    return answer;
}


/* The same as function above
 * Assume that we don't know on which shards do we have partition certain piece.
 * We'll check them all (I mean shards that contain the whole partition)
 * And shards that don't have certain piece MUST mark that piece is_done true.
 * */
bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
                               size_t piece_number, const TasksShard & shards_with_partition)
{
    LOG_DEBUG(log, "Check that all shards processed partition " << partition_name
N
Nikita Mikhaylov 已提交
439
                   << " piece " + toString(piece_number) + " successfully");
440

N
Nikita Mikhaylov 已提交
441
    auto zookeeper = context.getZooKeeper();
442

N
Nikita Mikhaylov 已提交
443 444
    /// Collect all shards that contain partition piece number piece_number.
    Strings piece_status_paths;
N
Nikita Mikhaylov 已提交
445 446 447
    for (auto & shard : shards_with_partition)
    {
        ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
N
Nikita Mikhaylov 已提交
448 449
        ShardPartitionPiece & shard_partition_piece = task_shard_partition.pieces[piece_number];
        piece_status_paths.emplace_back(shard_partition_piece.getShardStatusPath());
N
Nikita Mikhaylov 已提交
450
    }
451

N
Nikita Mikhaylov 已提交
452
    std::vector<int64_t> zxid1, zxid2;
453

N
Nikita Mikhaylov 已提交
454 455 456
    try
    {
        std::vector<zkutil::ZooKeeper::FutureGet> get_futures;
N
Nikita Mikhaylov 已提交
457
        for (const String & path : piece_status_paths)
N
Nikita Mikhaylov 已提交
458
            get_futures.emplace_back(zookeeper->asyncGet(path));
459

N
Nikita Mikhaylov 已提交
460 461 462 463
        // Check that state is Finished and remember zxid
        for (auto & future : get_futures)
        {
            auto res = future.get();
464

N
Nikita Mikhaylov 已提交
465 466 467
            TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data);
            if (status.state != TaskState::Finished)
            {
N
Nikita Mikhaylov 已提交
468 469
                LOG_INFO(log, "The task " << res.data << " is being rewritten by "
                              << status.owner << ". Partition piece will be rechecked");
N
Nikita Mikhaylov 已提交
470 471
                return false;
            }
472

N
Nikita Mikhaylov 已提交
473 474
            zxid1.push_back(res.stat.pzxid);
        }
475

N
Nikita Mikhaylov 已提交
476 477 478 479 480 481 482 483
        const String piece_is_dirty_flag_path = task_table.getCertainPartitionPieceIsDirtyPath(partition_name, piece_number);
        const String piece_is_dirty_cleaned_path = task_table.getCertainPartitionPieceIsCleanedPath(partition_name, piece_number);
        const String piece_task_status_path = task_table.getCertainPartitionPieceTaskStatusPath(partition_name, piece_number);

        CleanStateClock clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);

        const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_task_status_path);

N
Nikita Mikhaylov 已提交
484

N
Nikita Mikhaylov 已提交
485
        if (!is_clean)
N
Nikita Mikhaylov 已提交
486
        {
N
Nikita Mikhaylov 已提交
487 488
            LOG_INFO(log, "Partition " << partition_name << " become dirty");
            return false;
N
Nikita Mikhaylov 已提交
489
        }
490

N
Nikita Mikhaylov 已提交
491
        get_futures.clear();
N
Nikita Mikhaylov 已提交
492
        for (const String & path : piece_status_paths)
N
Nikita Mikhaylov 已提交
493
            get_futures.emplace_back(zookeeper->asyncGet(path));
494

N
Nikita Mikhaylov 已提交
495 496 497 498 499 500 501 502 503
        // Remember zxid of states again
        for (auto & future : get_futures)
        {
            auto res = future.get();
            zxid2.push_back(res.stat.pzxid);
        }
    }
    catch (const Coordination::Exception & e)
    {
N
Nikita Mikhaylov 已提交
504
        LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name << " piece number "
N
Nikita Mikhaylov 已提交
505
                       << toString(piece_number) << ". Will recheck the partition. Error: " << e.displayText());
N
Nikita Mikhaylov 已提交
506 507
        return false;
    }
508

N
Nikita Mikhaylov 已提交
509
    // If all task is finished and zxid is not changed then partition could not become dirty again
N
Nikita Mikhaylov 已提交
510
    for (UInt64 shard_num = 0; shard_num < piece_status_paths.size(); ++shard_num)
N
Nikita Mikhaylov 已提交
511 512 513
    {
        if (zxid1[shard_num] != zxid2[shard_num])
        {
N
Nikita Mikhaylov 已提交
514
            LOG_INFO(log, "The task " << piece_status_paths[shard_num] << " is being modified now. Partition piece will be rechecked");
N
Nikita Mikhaylov 已提交
515 516 517
            return false;
        }
    }
518

N
Nikita Mikhaylov 已提交
519
    LOG_INFO(log, "Partition " << partition_name << " piece number " << toString(piece_number) << " is copied successfully");
N
Nikita Mikhaylov 已提交
520
    return true;
521 522
}

N
Nikita Mikhaylov 已提交
523

N
Nikita Mikhaylov 已提交
524
TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name)
N
Nikita Mikhaylov 已提交
525
{
N
Nikita Mikhaylov 已提交
526 527 528 529 530 531 532
    bool inject_fault = false;
    if (move_fault_probability > 0)
    {
        double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
        inject_fault = value < move_fault_probability;
    }

N
Nikita Mikhaylov 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
    LOG_DEBUG(log, "Try to move  " << partition_name << " to destionation table");

    auto zookeeper = context.getZooKeeper();

    const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name);
    const auto current_partition_attach_is_done   = task_table.getPartitionAttachIsDonePath(partition_name);

    /// Create ephemeral node to mark that we are active and process the partition
    zookeeper->createAncestors(current_partition_attach_is_active);
    zkutil::EphemeralNodeHolderPtr partition_attach_node_holder;
    try
    {
        partition_attach_node_holder = zkutil::EphemeralNodeHolder::create(current_partition_attach_is_active, *zookeeper, host_id);
    }
    catch (const Coordination::Exception & e)
    {
        if (e.code == Coordination::ZNODEEXISTS)
        {
            LOG_DEBUG(log, "Someone is already moving pieces " << current_partition_attach_is_active);
N
Nikita Mikhaylov 已提交
552
            return TaskStatus::Active;
N
Nikita Mikhaylov 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
        }

        throw;
    }


    /// Exit if task has been already processed;
    /// create blocking node to signal cleaning up if it is abandoned
    {
        String status_data;
        if (zookeeper->tryGet(current_partition_attach_is_done, status_data))
        {
            TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
            if (status.state == TaskState::Finished)
            {
                LOG_DEBUG(log, "All pieces for partition from this task " << current_partition_attach_is_active
                                       << " has been successfully moved to destination table by " << status.owner);
N
Nikita Mikhaylov 已提交
570
                return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
571 572 573 574 575 576 577
            }

            /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
            /// Initialize DROP PARTITION
            LOG_DEBUG(log, "Moving piece for partition " << current_partition_attach_is_active
                                   << " has not been successfully finished by " << status.owner
                                   << ". Will try to move by myself.");
N
better  
Nikita Mikhaylov 已提交
578 579 580

            /// Remove is_done marker.
            zookeeper->remove(current_partition_attach_is_done);
N
Nikita Mikhaylov 已提交
581 582 583 584 585 586 587 588 589 590
        }
    }


    /// Try start processing, create node about it
    {
        String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
        zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
    }

N
Nikita Mikhaylov 已提交
591
    /// Move partition to original destination table.
N
Nikita Mikhaylov 已提交
592 593
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
N
Nikita Mikhaylov 已提交
594 595 596
        LOG_DEBUG(log, "Trying to move partition " << partition_name
                                                   << " piece " << toString(current_piece_number)
                                                   << " to original table");
N
Nikita Mikhaylov 已提交
597

N
Nikita Mikhaylov 已提交
598 599
        ASTPtr query_alter_ast;
        String query_alter_ast_string;
N
Nikita Mikhaylov 已提交
600

N
Nikita Mikhaylov 已提交
601 602 603 604
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first,
                                                                  original_table.second + "_piece_" +
                                                                  toString(current_piece_number));
N
Nikita Mikhaylov 已提交
605

N
nikitamikhaylov 已提交
606 607 608 609 610 611 612
        Settings settings_push = task_cluster->settings_push;

        /// It is important, ALTER ATTACH PARTITION must be done synchronously
        /// And we will execute this ALTER query on each replica of a shard.
        /// It is correct, because this query is idempotent.
        settings_push.replication_alter_partitions_sync = 2;

N
Nikita Mikhaylov 已提交
613 614
        query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
                                  " ATTACH PARTITION " + partition_name +
N
nikitamikhaylov 已提交
615
                                  " FROM " + getQuotedTable(helping_table);
N
Nikita Mikhaylov 已提交
616

N
Nikita Mikhaylov 已提交
617
        LOG_DEBUG(log, "Executing ALTER query: " << query_alter_ast_string);
N
Nikita Mikhaylov 已提交
618

N
Nikita Mikhaylov 已提交
619 620
        try
        {
N
Nikita Mikhaylov 已提交
621
            size_t num_nodes = executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
622 623 624
                    task_table.cluster_push,
                    query_alter_ast_string,
                    nullptr,
N
nikitamikhaylov 已提交
625
                    &settings_push,
N
Nikita Mikhaylov 已提交
626 627
                    PoolMode::GET_MANY,
                    ClusterExecutionMode::ON_EACH_NODE);
N
Nikita Mikhaylov 已提交
628

N
Nikita Mikhaylov 已提交
629 630 631 632 633 634 635 636 637
            LOG_INFO(log, "Number of nodes that executed ALTER query successfully : " << toString(num_nodes));
        }
        catch (...)
        {
            LOG_DEBUG(log, "Error while moving partition " << partition_name
                                                           << " piece " << toString(current_piece_number)
                                                           << "to original table");
            throw;
        }
N
Nikita Mikhaylov 已提交
638

N
Nikita Mikhaylov 已提交
639 640
        if (inject_fault)
            throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
N
Nikita Mikhaylov 已提交
641

N
Nikita Mikhaylov 已提交
642 643 644
        try
        {
            String query_deduplicate_ast_string;
N
Nikita Mikhaylov 已提交
645 646
            if (!task_table.isReplicatedTable())
            {
N
Nikita Mikhaylov 已提交
647 648 649 650 651 652 653 654 655 656 657 658 659 660
                query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
                                                " PARTITION " + partition_name + " DEDUPLICATE;";

                LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: " << query_alter_ast_string);

                UInt64 num_nodes = executeQueryOnCluster(
                        task_table.cluster_push,
                        query_deduplicate_ast_string,
                        nullptr,
                        &task_cluster->settings_push,
                        PoolMode::GET_MANY);

                LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : "
                        << toString(num_nodes));
N
Nikita Mikhaylov 已提交
661 662
            }
        }
N
Nikita Mikhaylov 已提交
663 664 665 666 667 668
        catch (...)
        {
            LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition " << partition_name
                                                                                   << "in the original table");
            throw;
        }
N
Nikita Mikhaylov 已提交
669 670 671 672 673 674 675 676
    }

    /// Create node to signal that we finished moving
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
        zookeeper->set(current_partition_attach_is_done, state_finished, 0);
    }

N
Nikita Mikhaylov 已提交
677
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
678 679
}

N
Nikita Mikhaylov 已提交
680
/// Removes MATERIALIZED and ALIAS columns from create table query
N
Nikita Mikhaylov 已提交
681
ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
682
{
N
Nikita Mikhaylov 已提交
683 684
    const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
    auto new_columns = std::make_shared<ASTExpressionList>();
685

N
Nikita Mikhaylov 已提交
686 687 688
    for (const ASTPtr & column_ast : column_asts)
    {
        const auto & column = column_ast->as<ASTColumnDeclaration &>();
689

N
Nikita Mikhaylov 已提交
690 691 692 693 694 695
        if (!column.default_specifier.empty())
        {
            ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
            if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
                continue;
        }
696

N
Nikita Mikhaylov 已提交
697 698
        new_columns->children.emplace_back(column_ast->clone());
    }
699

N
Nikita Mikhaylov 已提交
700 701
    ASTPtr new_query_ast = query_ast->clone();
    auto & new_query = new_query_ast->as<ASTCreateQuery &>();
702

N
Nikita Mikhaylov 已提交
703 704 705 706
    auto new_columns_list = std::make_shared<ASTColumns>();
    new_columns_list->set(new_columns_list->columns, new_columns);
    if (auto indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
        new_columns_list->set(new_columns_list->indices, indices->clone());
707

N
Nikita Mikhaylov 已提交
708 709 710
    new_query.replace(new_query.columns_list, new_columns_list);

    return new_query_ast;
711 712
}

A
Alexey Milovidov 已提交
713
/// Replaces ENGINE and table name in a create query
714 715 716
std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_query_ast,
                                                          const DatabaseAndTableName & new_table,
                                                          const ASTPtr & new_storage_ast)
717
{
N
Nikita Mikhaylov 已提交
718 719
    const auto & create = create_query_ast->as<ASTCreateQuery &>();
    auto res = std::make_shared<ASTCreateQuery>(create);
720

N
Nikita Mikhaylov 已提交
721 722
    if (create.storage == nullptr || new_storage_ast == nullptr)
        throw Exception("Storage is not specified", ErrorCodes::LOGICAL_ERROR);
723

N
Nikita Mikhaylov 已提交
724 725
    res->database = new_table.first;
    res->table = new_table.second;
726

N
Nikita Mikhaylov 已提交
727 728 729
    res->children.clear();
    res->set(res->columns_list, create.columns_list->clone());
    res->set(res->storage, new_storage_ast->clone());
730

N
Nikita Mikhaylov 已提交
731
    return res;
732 733
}

734

N
Nikita Mikhaylov 已提交
735 736 737 738 739
bool ClusterCopier::tryDropPartitionPiece(
        ShardPartition & task_partition,
        const size_t current_piece_number,
        const zkutil::ZooKeeperPtr & zookeeper,
        const CleanStateClock & clean_state_clock)
740
{
N
Nikita Mikhaylov 已提交
741 742
    if (is_safe_mode)
        throw Exception("DROP PARTITION is prohibited in safe mode", ErrorCodes::NOT_IMPLEMENTED);
743

N
Nikita Mikhaylov 已提交
744
    TaskTable & task_table = task_partition.task_shard.task_table;
N
Nikita Mikhaylov 已提交
745
    ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];
746

N
Nikita Mikhaylov 已提交
747 748 749 750 751
    const String current_shards_path                  = partition_piece.getPartitionPieceShardsPath();
    const String current_partition_active_workers_dir = partition_piece.getPartitionPieceActiveWorkersPath();
    const String is_dirty_flag_path                   = partition_piece.getPartitionPieceIsDirtyPath();
    const String dirty_cleaner_path                   = partition_piece.getPartitionPieceCleanerPath();
    const String is_dirty_cleaned_path                = partition_piece.getPartitionPieceIsCleanedPath();
752

N
Nikita Mikhaylov 已提交
753 754
    zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
    try
755
    {
N
Nikita Mikhaylov 已提交
756
        cleaner_holder = zkutil::EphemeralNodeHolder::create(dirty_cleaner_path, *zookeeper, host_id);
757
    }
N
Nikita Mikhaylov 已提交
758
    catch (const Coordination::Exception & e)
759
    {
N
Nikita Mikhaylov 已提交
760 761
        if (e.code == Coordination::ZNODEEXISTS)
        {
N
Nikita Mikhaylov 已提交
762 763
            LOG_DEBUG(log, "Partition " << task_partition.name << " piece "
                            << toString(current_piece_number) << " is cleaning now by somebody, sleep");
N
Nikita Mikhaylov 已提交
764 765 766
            std::this_thread::sleep_for(default_sleep_time);
            return false;
        }
767

N
Nikita Mikhaylov 已提交
768
        throw;
769
    }
770

N
Nikita Mikhaylov 已提交
771
    Coordination::Stat stat{};
N
Nikita Mikhaylov 已提交
772
    if (zookeeper->exists(current_partition_active_workers_dir, &stat))
773
    {
N
Nikita Mikhaylov 已提交
774
        if (stat.numChildren != 0)
775
        {
N
Nikita Mikhaylov 已提交
776 777
            LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren
                            << " active workers while trying to drop it. Going to sleep.");
N
Nikita Mikhaylov 已提交
778 779
            std::this_thread::sleep_for(default_sleep_time);
            return false;
780 781 782
        }
        else
        {
N
Nikita Mikhaylov 已提交
783
            zookeeper->remove(current_partition_active_workers_dir);
784
        }
785
    }
786

N
Nikita Mikhaylov 已提交
787 788 789 790 791 792 793 794 795 796 797 798 799
    {
        zkutil::EphemeralNodeHolder::Ptr active_workers_lock;
        try
        {
            active_workers_lock = zkutil::EphemeralNodeHolder::create(current_partition_active_workers_dir, *zookeeper, host_id);
        }
        catch (const Coordination::Exception & e)
        {
            if (e.code == Coordination::ZNODEEXISTS)
            {
                LOG_DEBUG(log, "Partition " << task_partition.name << " is being filled now by somebody, sleep");
                return false;
            }
800

N
Nikita Mikhaylov 已提交
801 802
            throw;
        }
803

N
Nikita Mikhaylov 已提交
804 805
        // Lock the dirty flag
        zookeeper->set(is_dirty_flag_path, host_id, clean_state_clock.discovery_version.value());
N
Nikita Mikhaylov 已提交
806
        zookeeper->tryRemove(partition_piece.getPartitionPieceCleanStartPath());
N
Nikita Mikhaylov 已提交
807
        CleanStateClock my_clock(zookeeper, is_dirty_flag_path, is_dirty_cleaned_path);
808

N
Nikita Mikhaylov 已提交
809
        /// Remove all status nodes
810
        {
N
Nikita Mikhaylov 已提交
811 812 813 814 815 816
            Strings children;
            if (zookeeper->tryGetChildren(current_shards_path, children) == Coordination::ZOK)
                for (const auto & child : children)
                {
                    zookeeper->removeRecursive(current_shards_path + "/" + child);
                }
817 818
        }

N
Nikita Mikhaylov 已提交
819 820 821 822 823

        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

        String query = "ALTER TABLE " + getQuotedTable(helping_table);
N
Nikita Mikhaylov 已提交
824
        query += " DROP PARTITION " + task_partition.name + "";
825

N
Nikita Mikhaylov 已提交
826 827
        /// TODO: use this statement after servers will be updated up to 1.1.54310
        // query += " DROP PARTITION ID '" + task_partition.name + "'";
828

N
Nikita Mikhaylov 已提交
829 830
        ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;
831

N
Nikita Mikhaylov 已提交
832 833
        /// It is important, DROP PARTITION must be done synchronously
        settings_push.replication_alter_partitions_sync = 2;
834

N
Nikita Mikhaylov 已提交
835
        LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
N
better  
Nikita Mikhaylov 已提交
836
        /// We have to drop partition_piece on each replica
N
Nikita Mikhaylov 已提交
837
        size_t num_shards = executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
838 839 840
                cluster_push, query,
                nullptr,
                &settings_push,
N
better  
Nikita Mikhaylov 已提交
841 842
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);
843

N
Nikita Mikhaylov 已提交
844
        LOG_INFO(log, "DROP PARTITION was successfully executed on " << num_shards << " nodes of a cluster.");
845

N
Nikita Mikhaylov 已提交
846 847 848 849 850
        /// Update the locking node
        if (!my_clock.is_stale())
        {
            zookeeper->set(is_dirty_flag_path, host_id, my_clock.discovery_version.value());
            if (my_clock.clean_state_version)
N
Nikita Mikhaylov 已提交
851
                zookeeper->set(is_dirty_cleaned_path, host_id, my_clock.clean_state_version.value());
N
Nikita Mikhaylov 已提交
852
            else
N
Nikita Mikhaylov 已提交
853
                zookeeper->create(is_dirty_cleaned_path, host_id, zkutil::CreateMode::Persistent);
N
Nikita Mikhaylov 已提交
854 855
        }
        else
856
        {
N
Nikita Mikhaylov 已提交
857 858 859 860
            LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
            /// clean state is stale
            return false;
        }
861

N
Nikita Mikhaylov 已提交
862 863
        LOG_INFO(log, "Partition " << task_partition.name <<  " piece " << toString(current_piece_number)
                       << " was dropped on cluster " << task_table.cluster_push_name);
N
Nikita Mikhaylov 已提交
864 865 866
        if (zookeeper->tryCreate(current_shards_path, host_id, zkutil::CreateMode::Persistent) == Coordination::ZNODEEXISTS)
            zookeeper->set(current_shards_path, host_id);
    }
867

N
better  
Nikita Mikhaylov 已提交
868
    LOG_INFO(log, "Partition " << task_partition.name <<  " piece " << toString(current_piece_number) << " is safe for work now.");
N
Nikita Mikhaylov 已提交
869 870
    return true;
}
871

N
Nikita Mikhaylov 已提交
872
bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
873
{
N
Nikita Mikhaylov 已提交
874 875
    /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
    bool previous_shard_is_instantly_finished = false;
876

N
Nikita Mikhaylov 已提交
877 878 879 880 881
    /// Process each partition that is present in cluster
    for (const String & partition_name : task_table.ordered_partition_names)
    {
        if (!task_table.cluster_partitions.count(partition_name))
            throw Exception("There are no expected partition " + partition_name + ". It is a bug", ErrorCodes::LOGICAL_ERROR);
882

N
Nikita Mikhaylov 已提交
883
        ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
884

N
Nikita Mikhaylov 已提交
885
        Stopwatch watch;
N
Nikita Mikhaylov 已提交
886
        /// We will check all the shards of the table and check if they contain current partition.
N
Nikita Mikhaylov 已提交
887 888
        TasksShard expected_shards;
        UInt64 num_failed_shards = 0;
889

N
Nikita Mikhaylov 已提交
890
        ++cluster_partition.total_tries;
891

N
Nikita Mikhaylov 已提交
892
        LOG_DEBUG(log, "Processing partition " << partition_name << " for the whole cluster");
893

N
Nikita Mikhaylov 已提交
894 895 896 897
        /// Process each source shard having current partition and copy current partition
        /// NOTE: shards are sorted by "distance" to current host
        bool has_shard_to_process = false;
        for (const TaskShardPtr & shard : task_table.all_shards)
898
        {
N
Nikita Mikhaylov 已提交
899 900
            /// Does shard have a node with current partition?
            if (shard->partition_tasks.count(partition_name) == 0)
901
            {
N
Nikita Mikhaylov 已提交
902 903
                /// If not, did we check existence of that partition previously?
                if (shard->checked_partitions.count(partition_name) == 0)
904
                {
N
Nikita Mikhaylov 已提交
905 906
                    auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); };
                    bool has_partition = retry(check_shard_has_partition);
907

N
Nikita Mikhaylov 已提交
908
                    shard->checked_partitions.emplace(partition_name);
909

N
Nikita Mikhaylov 已提交
910 911
                    if (has_partition)
                    {
N
Nikita Mikhaylov 已提交
912 913
                        const size_t number_of_splits = task_table.number_of_splits;
                        shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name, number_of_splits));
N
Nikita Mikhaylov 已提交
914
                        LOG_DEBUG(log, "Discovered partition " << partition_name << " in shard " << shard->getDescription());
N
Nikita Mikhaylov 已提交
915 916 917 918 919 920 921 922 923
                        /// To save references in the future.
                        auto shard_partition_it = shard->partition_tasks.find(partition_name);
                        PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;

                        for (size_t piece_number = 0; piece_number < number_of_splits; ++piece_number)
                        {
                            auto res = checkPresentPartitionPiecesOnCurrentShard(timeouts, *shard, partition_name, piece_number);
                            shard_partition_pieces.emplace_back(shard_partition_it->second, piece_number, res);
                        }
N
Nikita Mikhaylov 已提交
924 925 926 927 928 929 930 931
                    }
                    else
                    {
                        LOG_DEBUG(log, "Found that shard " << shard->getDescription() << " does not contain current partition " << partition_name);
                        continue;
                    }
                }
                else
932
                {
N
Nikita Mikhaylov 已提交
933 934 935
                    /// We have already checked that partition, but did not discover it
                    previous_shard_is_instantly_finished = true;
                    continue;
936
                }
N
Nikita Mikhaylov 已提交
937
            }
938

N
Nikita Mikhaylov 已提交
939
            auto it_shard_partition = shard->partition_tasks.find(partition_name);
N
Nikita Mikhaylov 已提交
940 941
            /// Previously when we discovered that shard does not contain current partition, we skipped it.
            /// At this moment partition have to be present.
N
Nikita Mikhaylov 已提交
942
            if (it_shard_partition == shard->partition_tasks.end())
N
Nikita Mikhaylov 已提交
943
                throw Exception("There are no such partition in a shard. This is a bug.", ErrorCodes::LOGICAL_ERROR);
N
Nikita Mikhaylov 已提交
944
            auto & partition = it_shard_partition->second;
945

N
Nikita Mikhaylov 已提交
946
            expected_shards.emplace_back(shard);
947

N
Nikita Mikhaylov 已提交
948 949
            /// Do not sleep if there is a sequence of already processed shards to increase startup
            bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
N
Nikita Mikhaylov 已提交
950
            TaskStatus task_status = TaskStatus::Error;
N
Nikita Mikhaylov 已提交
951 952 953
            bool was_error = false;
            has_shard_to_process = true;
            for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
954
            {
N
Nikita Mikhaylov 已提交
955
                task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
956

N
Nikita Mikhaylov 已提交
957
                /// Exit if success
N
Nikita Mikhaylov 已提交
958
                if (task_status == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
959
                    break;
960

N
Nikita Mikhaylov 已提交
961
                was_error = true;
962

N
Nikita Mikhaylov 已提交
963
                /// Skip if the task is being processed by someone
N
Nikita Mikhaylov 已提交
964
                if (task_status == TaskStatus::Active)
N
Nikita Mikhaylov 已提交
965
                    break;
966

N
Nikita Mikhaylov 已提交
967 968
                /// Repeat on errors
                std::this_thread::sleep_for(default_sleep_time);
969
            }
N
Nikita Mikhaylov 已提交
970

N
Nikita Mikhaylov 已提交
971
            if (task_status == TaskStatus::Error)
N
Nikita Mikhaylov 已提交
972 973 974
                ++num_failed_shards;

            previous_shard_is_instantly_finished = !was_error;
975 976
        }

N
Nikita Mikhaylov 已提交
977 978 979 980
        cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();

        /// Check that whole cluster partition is done
        /// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
N
Nikita Mikhaylov 已提交
981
        bool partition_copying_is_done = num_failed_shards == 0;
N
Nikita Mikhaylov 已提交
982
        try
983
        {
N
Nikita Mikhaylov 已提交
984
            partition_copying_is_done =
N
Nikita Mikhaylov 已提交
985
                    !has_shard_to_process
N
Nikita Mikhaylov 已提交
986
                    || (partition_copying_is_done && checkAllPiecesInPartitionAreDone(task_table, partition_name, expected_shards));
987
        }
N
Nikita Mikhaylov 已提交
988
        catch (...)
989
        {
N
Nikita Mikhaylov 已提交
990
            tryLogCurrentException(log);
N
Nikita Mikhaylov 已提交
991 992 993 994 995 996 997 998
            partition_copying_is_done = false;
        }


        bool partition_moving_is_done = false;
        /// Try to move only if all pieces were copied.
        if (partition_copying_is_done)
        {
N
Nikita Mikhaylov 已提交
999
            for (UInt64 try_num = 0; try_num < max_shard_partition_piece_tries_for_alter; ++try_num)
N
Nikita Mikhaylov 已提交
1000
            {
N
Nikita Mikhaylov 已提交
1001 1002 1003
                try
                {
                    auto res = tryMoveAllPiecesToDestinationTable(task_table, partition_name);
N
Nikita Mikhaylov 已提交
1004 1005
                    /// Exit and mark current task is done.
                    if (res == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
1006 1007 1008 1009 1010
                    {
                        partition_moving_is_done = true;
                        break;
                    }

N
Nikita Mikhaylov 已提交
1011 1012
                    /// Exit if this task is active.
                    if (res == TaskStatus::Active)
N
better  
Nikita Mikhaylov 已提交
1013
                        break;
N
Nikita Mikhaylov 已提交
1014

N
Nikita Mikhaylov 已提交
1015
                    /// Repeat on errors.
N
better  
Nikita Mikhaylov 已提交
1016
                    std::this_thread::sleep_for(default_sleep_time);
N
Nikita Mikhaylov 已提交
1017
                }
N
Nikita Mikhaylov 已提交
1018 1019 1020
                catch (...)
                {
                    tryLogCurrentException(log, "Some error occured while moving pieces to destination table for partition " + partition_name);
N
Nikita Mikhaylov 已提交
1021
                }
N
Nikita Mikhaylov 已提交
1022
            }
1023 1024
        }

N
Nikita Mikhaylov 已提交
1025
        if (partition_copying_is_done && partition_moving_is_done)
1026
        {
N
Nikita Mikhaylov 已提交
1027
            task_table.finished_cluster_partitions.emplace(partition_name);
1028

N
Nikita Mikhaylov 已提交
1029 1030 1031
            task_table.bytes_copied += cluster_partition.bytes_copied;
            task_table.rows_copied += cluster_partition.rows_copied;
            double elapsed = cluster_partition.elapsed_time_seconds;
1032

N
Nikita Mikhaylov 已提交
1033
            LOG_INFO(log, "It took " << std::fixed << std::setprecision(2) << elapsed << " seconds to copy partition " << partition_name
N
Nikita Mikhaylov 已提交
1034 1035 1036
                                     << ": " << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied) << " uncompressed bytes"
                                     << ", " << formatReadableQuantity(cluster_partition.rows_copied) << " rows"
                                     << " and " << cluster_partition.blocks_copied << " source blocks are copied");
1037

N
Nikita Mikhaylov 已提交
1038
            if (cluster_partition.rows_copied)
1039
            {
N
Nikita Mikhaylov 已提交
1040
                LOG_INFO(log, "Average partition speed: "
N
Nikita Mikhaylov 已提交
1041
                        << formatReadableSizeWithDecimalSuffix(cluster_partition.bytes_copied / elapsed) << " per second.");
1042
            }
1043

N
Nikita Mikhaylov 已提交
1044
            if (task_table.rows_copied)
1045
            {
N
Nikita Mikhaylov 已提交
1046
                LOG_INFO(log, "Average table " << task_table.table_id << " speed: "
N
Nikita Mikhaylov 已提交
1047
                                               << formatReadableSizeWithDecimalSuffix(task_table.bytes_copied / elapsed) << " per second.");
N
Nikita Mikhaylov 已提交
1048 1049 1050
            }
        }
    }
1051

N
Nikita Mikhaylov 已提交
1052 1053 1054
    UInt64 required_partitions = task_table.cluster_partitions.size();
    UInt64 finished_partitions = task_table.finished_cluster_partitions.size();
    bool table_is_done = finished_partitions >= required_partitions;
1055

N
Nikita Mikhaylov 已提交
1056 1057 1058
    if (!table_is_done)
    {
        LOG_INFO(log, "Table " + task_table.table_id + " is not processed yet."
N
Nikita Mikhaylov 已提交
1059
                << "Copied " << finished_partitions << " of " << required_partitions << ", will retry");
N
Nikita Mikhaylov 已提交
1060
    }
1061

N
Nikita Mikhaylov 已提交
1062 1063
    return table_is_done;
}
1064

N
Nikita Mikhaylov 已提交
1065
/// Job for copying partition from particular shard.
N
Nikita Mikhaylov 已提交
1066
TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
N
Nikita Mikhaylov 已提交
1067
{
N
Nikita Mikhaylov 已提交
1068
    TaskStatus res;
1069

N
Nikita Mikhaylov 已提交
1070 1071
    try
    {
N
Nikita Mikhaylov 已提交
1072
        res = iterateThroughAllPiecesInPartition(timeouts, task_partition, is_unprioritized_task);
N
Nikita Mikhaylov 已提交
1073 1074 1075 1076
    }
    catch (...)
    {
        tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
N
Nikita Mikhaylov 已提交
1077
        res = TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1078
    }
1079

N
Nikita Mikhaylov 已提交
1080 1081 1082 1083 1084 1085 1086 1087 1088
    /// At the end of each task check if the config is updated
    try
    {
        updateConfigIfNeeded();
    }
    catch (...)
    {
        tryLogCurrentException(log, "An error occurred while updating the config");
    }
1089

N
Nikita Mikhaylov 已提交
1090 1091
    return res;
}
1092

N
Nikita Mikhaylov 已提交
1093
TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
N
Nikita Mikhaylov 已提交
1094 1095 1096 1097
                                                       bool is_unprioritized_task)
{
    const size_t total_number_of_pieces = task_partition.task_shard.task_table.number_of_splits;

N
Nikita Mikhaylov 已提交
1098
    TaskStatus res{TaskStatus::Finished};
N
Nikita Mikhaylov 已提交
1099 1100 1101

    bool was_failed_pieces = false;
    bool was_active_pieces = false;
N
Nikita Mikhaylov 已提交
1102

N
Nikita Mikhaylov 已提交
1103 1104
    for (size_t piece_number = 0; piece_number < total_number_of_pieces; piece_number++)
    {
N
Nikita Mikhaylov 已提交
1105 1106
        for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
        {
N
better  
Nikita Mikhaylov 已提交
1107 1108 1109
            LOG_INFO(log, "Attempt number " << try_num << " to process partition " << task_partition.name
                          << " piece number " << piece_number << " on shard number " << task_partition.task_shard.numberInCluster()
                          << " with index " << task_partition.task_shard.indexInCluster());
N
Nikita Mikhaylov 已提交
1110 1111 1112
            res = processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task);

            /// Exit if success
N
Nikita Mikhaylov 已提交
1113
            if (res == TaskStatus::Finished)
N
Nikita Mikhaylov 已提交
1114 1115 1116
                break;

            /// Skip if the task is being processed by someone
N
Nikita Mikhaylov 已提交
1117
            if (res == TaskStatus::Active)
N
Nikita Mikhaylov 已提交
1118 1119 1120 1121 1122 1123
                break;

            /// Repeat on errors
            std::this_thread::sleep_for(default_sleep_time);
        }

N
Nikita Mikhaylov 已提交
1124 1125
        was_active_pieces = (res == TaskStatus::Active);
        was_failed_pieces = (res == TaskStatus::Error);
N
Nikita Mikhaylov 已提交
1126
    }
N
Nikita Mikhaylov 已提交
1127

N
Nikita Mikhaylov 已提交
1128
    if (was_failed_pieces)
N
Nikita Mikhaylov 已提交
1129
        return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1130 1131

    if (was_active_pieces)
N
Nikita Mikhaylov 已提交
1132
        return TaskStatus::Active;
N
Nikita Mikhaylov 已提交
1133

N
Nikita Mikhaylov 已提交
1134
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1135 1136
}

N
Nikita Mikhaylov 已提交
1137

N
Nikita Mikhaylov 已提交
1138
TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
N
Nikita Mikhaylov 已提交
1139 1140
        const ConnectionTimeouts & timeouts, ShardPartition & task_partition,
        const size_t current_piece_number, bool is_unprioritized_task)
N
Nikita Mikhaylov 已提交
1141 1142 1143
{
    TaskShard & task_shard = task_partition.task_shard;
    TaskTable & task_table = task_shard.task_table;
N
Nikita Mikhaylov 已提交
1144
    ClusterPartition & cluster_partition  = task_table.getClusterPartition(task_partition.name);
N
Nikita Mikhaylov 已提交
1145 1146 1147 1148
    ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number];

    const size_t number_of_splits = task_table.number_of_splits;
    const String primary_key_comma_separated = task_table.primary_key_comma_separated;
1149

N
Nikita Mikhaylov 已提交
1150
    /// We need to update table definitions for each partition, it could be changed after ALTER
N
Nikita Mikhaylov 已提交
1151 1152 1153
    createShardInternalTables(timeouts, task_shard, true);

    auto split_table_for_current_piece = task_shard.list_of_split_tables_on_shard[current_piece_number];
1154

N
Nikita Mikhaylov 已提交
1155
    auto zookeeper = context.getZooKeeper();
1156

N
Nikita Mikhaylov 已提交
1157 1158
    const String piece_is_dirty_flag_path          = partition_piece.getPartitionPieceIsDirtyPath();
    const String piece_is_dirty_cleaned_path       = partition_piece.getPartitionPieceIsCleanedPath();
N
Nikita Mikhaylov 已提交
1159
    const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath();
N
Nikita Mikhaylov 已提交
1160
    const String current_task_piece_status_path    = partition_piece.getShardStatusPath();
1161

N
Nikita Mikhaylov 已提交
1162
    /// Auxiliary functions:
1163

N
Nikita Mikhaylov 已提交
1164
    /// Creates is_dirty node to initialize DROP PARTITION
N
Nikita Mikhaylov 已提交
1165
    auto create_is_dirty_node = [&] (const CleanStateClock & clock)
N
Nikita Mikhaylov 已提交
1166 1167 1168 1169 1170 1171 1172 1173
    {
        if (clock.is_stale())
            LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
        else if (!clock.is_clean())
            LOG_DEBUG(log, "Thank you, Captain Obvious");
        else if (clock.discovery_version)
        {
            LOG_DEBUG(log, "Updating clean state clock");
N
Nikita Mikhaylov 已提交
1174
            zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value());
N
Nikita Mikhaylov 已提交
1175 1176
        }
        else
1177
        {
N
Nikita Mikhaylov 已提交
1178
            LOG_DEBUG(log, "Creating clean state clock");
N
Nikita Mikhaylov 已提交
1179
            zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
1180
        }
N
Nikita Mikhaylov 已提交
1181
    };
1182

N
Nikita Mikhaylov 已提交
1183
    /// Returns SELECT query filtering current partition and applying user filter
N
Nikita Mikhaylov 已提交
1184
    auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, bool enable_splitting, String limit = "")
1185
    {
N
Nikita Mikhaylov 已提交
1186 1187
        String query;
        query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
N
Nikita Mikhaylov 已提交
1188

A
alexey-milovidov 已提交
1189 1190
        if (enable_splitting && experimental_use_sample_offset)
            query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
N
Nikita Mikhaylov 已提交
1191

N
Nikita Mikhaylov 已提交
1192 1193
        /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
        query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
N
Nikita Mikhaylov 已提交
1194

A
alexey-milovidov 已提交
1195 1196
        if (enable_splitting && !experimental_use_sample_offset)
            query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + " = " + toString(current_piece_number) + " )";
N
Nikita Mikhaylov 已提交
1197

N
Nikita Mikhaylov 已提交
1198 1199
        if (!task_table.where_condition_str.empty())
            query += " AND (" + task_table.where_condition_str + ")";
N
Nikita Mikhaylov 已提交
1200

N
Nikita Mikhaylov 已提交
1201 1202
        if (!limit.empty())
            query += " LIMIT " + limit;
1203

N
Nikita Mikhaylov 已提交
1204
        ParserQuery p_query(query.data() + query.size());
1205 1206 1207

        const auto & settings = context.getSettingsRef();
        return parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1208
    };
1209

N
Nikita Mikhaylov 已提交
1210
    /// Load balancing
N
Nikita Mikhaylov 已提交
1211
    auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task);
1212

N
Nikita Mikhaylov 已提交
1213
    LOG_DEBUG(log, "Processing " << current_task_piece_status_path);
1214

N
Nikita Mikhaylov 已提交
1215
    const String piece_status_path = partition_piece.getPartitionPieceShardsPath();
1216

N
Nikita Mikhaylov 已提交
1217 1218 1219
    CleanStateClock clean_state_clock(zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);

    const bool is_clean = checkPartitionPieceIsClean(zookeeper, clean_state_clock, piece_status_path);
1220

N
Nikita Mikhaylov 已提交
1221
    /// Do not start if partition piece is dirty, try to clean it
N
Nikita Mikhaylov 已提交
1222
    if (is_clean)
1223
    {
N
Nikita Mikhaylov 已提交
1224 1225
        LOG_DEBUG(log, "Partition " << task_partition.name
                        << " piece " + toString(current_piece_number) + " appears to be clean");
N
Nikita Mikhaylov 已提交
1226
        zookeeper->createAncestors(current_task_piece_status_path);
1227
    }
N
Nikita Mikhaylov 已提交
1228
    else
1229
    {
N
Nikita Mikhaylov 已提交
1230 1231
        LOG_DEBUG(log, "Partition " << task_partition.name
                        << " piece " + toString(current_piece_number) + " is dirty, try to drop it");
1232

N
Nikita Mikhaylov 已提交
1233 1234
        try
        {
N
Nikita Mikhaylov 已提交
1235
            tryDropPartitionPiece(task_partition, current_piece_number, zookeeper, clean_state_clock);
N
Nikita Mikhaylov 已提交
1236 1237 1238 1239 1240
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occurred when clean partition");
        }
1241

N
Nikita Mikhaylov 已提交
1242
        return TaskStatus::Error;
1243 1244
    }

N
Nikita Mikhaylov 已提交
1245
    /// Create ephemeral node to mark that we are active and process the partition
N
Nikita Mikhaylov 已提交
1246
    zookeeper->createAncestors(current_task_piece_is_active_path);
N
Nikita Mikhaylov 已提交
1247 1248
    zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
    try
1249
    {
N
Nikita Mikhaylov 已提交
1250
        partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_piece_is_active_path, *zookeeper, host_id);
N
Nikita Mikhaylov 已提交
1251 1252 1253 1254
    }
    catch (const Coordination::Exception & e)
    {
        if (e.code == Coordination::ZNODEEXISTS)
1255
        {
N
Nikita Mikhaylov 已提交
1256
            LOG_DEBUG(log, "Someone is already processing " << current_task_piece_is_active_path);
N
Nikita Mikhaylov 已提交
1257
            return TaskStatus::Active;
1258 1259
        }

N
Nikita Mikhaylov 已提交
1260 1261
        throw;
    }
1262

N
Nikita Mikhaylov 已提交
1263 1264 1265 1266
    /// Exit if task has been already processed;
    /// create blocking node to signal cleaning up if it is abandoned
    {
        String status_data;
N
Nikita Mikhaylov 已提交
1267
        if (zookeeper->tryGet(current_task_piece_status_path, status_data))
1268
        {
N
Nikita Mikhaylov 已提交
1269 1270
            TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
            if (status.state == TaskState::Finished)
1271
            {
N
Nikita Mikhaylov 已提交
1272 1273
                LOG_DEBUG(log, "Task " << current_task_piece_status_path
                                << " has been successfully executed by " << status.owner);
N
Nikita Mikhaylov 已提交
1274
                return TaskStatus::Finished;
1275 1276
            }

N
Nikita Mikhaylov 已提交
1277 1278 1279 1280 1281
            /// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
            /// Initialize DROP PARTITION
            LOG_DEBUG(log, "Task " << current_task_piece_status_path
                            << " has not been successfully finished by " << status.owner
                            << ". Partition will be dropped and refilled.");
1282

N
Nikita Mikhaylov 已提交
1283
            create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1284
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1285
        }
1286 1287
    }

N
Nikita Mikhaylov 已提交
1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300

    /// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
    /// whether each shard have processed each partitition (and its pieces).
    if (partition_piece.is_absent_piece)
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
        auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
        if (res == Coordination::ZNODEEXISTS)
            LOG_DEBUG(log, "Partition " << task_partition.name << " piece "
            + toString(current_piece_number) + " is absent on current replica of a shard. But other replicas have already marked it as done.");
        if (res == Coordination::ZOK)
            LOG_DEBUG(log, "Partition " << task_partition.name << " piece "
            + toString(current_piece_number) + " is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.");
N
Nikita Mikhaylov 已提交
1301
        return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1302 1303
    }

N
Nikita Mikhaylov 已提交
1304 1305 1306
    /// Check that destination partition is empty if we are first worker
    /// NOTE: this check is incorrect if pull and push tables have different partition key!
    String clean_start_status;
N
Nikita Mikhaylov 已提交
1307
    if (!zookeeper->tryGet(partition_piece.getPartitionPieceCleanStartPath(), clean_start_status) || clean_start_status != "ok")
1308
    {
N
Nikita Mikhaylov 已提交
1309 1310
        zookeeper->createIfNotExists(partition_piece.getPartitionPieceCleanStartPath(), "");
        auto checker = zkutil::EphemeralNodeHolder::create(partition_piece.getPartitionPieceCleanStartPath() + "/checker",
N
Nikita Mikhaylov 已提交
1311
                                                           *zookeeper, host_id);
N
Nikita Mikhaylov 已提交
1312
        // Maybe we are the first worker
N
Nikita Mikhaylov 已提交
1313 1314

        ASTPtr query_select_ast = get_select_query(split_table_for_current_piece, "count()", /*enable_splitting*/ true);
N
Nikita Mikhaylov 已提交
1315 1316 1317 1318
        UInt64 count;
        {
            Context local_context = context;
            // Use pull (i.e. readonly) settings, but fetch data from destination servers
1319 1320
            local_context.setSettings(task_cluster->settings_pull);
            local_context.setSetting("skip_unavailable_shards", true);
N
Nikita Mikhaylov 已提交
1321 1322 1323 1324

            Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().in);
            count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
        }
1325

N
Nikita Mikhaylov 已提交
1326 1327
        if (count != 0)
        {
N
better  
Nikita Mikhaylov 已提交
1328 1329
            LOG_INFO(log, "Partition " << task_partition.name << " piece "
                          << current_piece_number << "is not empty. In contains " << count << " rows.");
N
Nikita Mikhaylov 已提交
1330
            Coordination::Stat stat_shards{};
N
Nikita Mikhaylov 已提交
1331
            zookeeper->get(partition_piece.getPartitionPieceShardsPath(), &stat_shards);
1332

N
Nikita Mikhaylov 已提交
1333 1334 1335 1336
            /// NOTE: partition is still fresh if dirt discovery happens before cleaning
            if (stat_shards.numChildren == 0)
            {
                LOG_WARNING(log, "There are no workers for partition " << task_partition.name
N
Nikita Mikhaylov 已提交
1337 1338 1339
                                  << " piece " << toString(current_piece_number)
                                  << ", but destination table contains " << count << " rows"
                                  << ". Partition will be dropped and refilled.");
1340

N
Nikita Mikhaylov 已提交
1341
                create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1342
                return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1343 1344
            }
        }
N
Nikita Mikhaylov 已提交
1345
        zookeeper->set(partition_piece.getPartitionPieceCleanStartPath(), "ok");
N
Nikita Mikhaylov 已提交
1346 1347 1348
    }
    /// At this point, we need to sync that the destination table is clean
    /// before any actual work
1349

N
Nikita Mikhaylov 已提交
1350 1351 1352
    /// Try start processing, create node about it
    {
        String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
N
Nikita Mikhaylov 已提交
1353
        CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
N
Nikita Mikhaylov 已提交
1354 1355
        if (clean_state_clock != new_clean_state_clock)
        {
N
Nikita Mikhaylov 已提交
1356 1357
            LOG_INFO(log, "Partition " << task_partition.name << " piece "
                          << toString(current_piece_number) << " clean state changed, cowardly bailing");
N
Nikita Mikhaylov 已提交
1358
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1359 1360 1361
        }
        else if (!new_clean_state_clock.is_clean())
        {
N
Nikita Mikhaylov 已提交
1362 1363
            LOG_INFO(log, "Partition " << task_partition.name << " piece "
                          << toString(current_piece_number) << " is dirty and will be dropped and refilled");
N
Nikita Mikhaylov 已提交
1364
            create_is_dirty_node(new_clean_state_clock);
N
Nikita Mikhaylov 已提交
1365
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1366
        }
N
Nikita Mikhaylov 已提交
1367
        zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent);
1368 1369
    }

N
Nikita Mikhaylov 已提交
1370
    /// Try create table (if not exists) on each shard
1371
    {
N
Nikita Mikhaylov 已提交
1372 1373 1374
        /// Define push table for current partition piece
        auto database_and_table_for_current_piece= std::pair<String, String>(
                task_table.table_push.first,
N
Nikita Mikhaylov 已提交
1375
                task_table.table_push.second + "_piece_" + toString(current_piece_number));
N
Nikita Mikhaylov 已提交
1376

N
Nikita Mikhaylov 已提交
1377 1378 1379
        auto new_engine_push_ast = task_table.engine_push_ast;
        if (task_table.isReplicatedTable())
        {
1380
            new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
N
Nikita Mikhaylov 已提交
1381 1382 1383 1384 1385 1386
        }

        auto create_query_push_ast = rewriteCreateQueryStorage(
                task_shard.current_pull_table_create_query,
                database_and_table_for_current_piece, new_engine_push_ast);

N
Nikita Mikhaylov 已提交
1387 1388 1389 1390
        create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
        String query = queryToString(create_query_push_ast);

        LOG_DEBUG(log, "Create destination tables. Query: " << query);
N
Nikita Mikhaylov 已提交
1391 1392 1393
        UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
                                              create_query_push_ast, &task_cluster->settings_push,
                                              PoolMode::GET_MANY);
N
Nikita Mikhaylov 已提交
1394 1395
        LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push)
                        << " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
N
Nikita Mikhaylov 已提交
1396
    }
1397

N
Nikita Mikhaylov 已提交
1398 1399 1400 1401
    /// Do the copying
    {
        bool inject_fault = false;
        if (copy_fault_probability > 0)
1402
        {
N
Nikita Mikhaylov 已提交
1403 1404
            double value = std::uniform_real_distribution<>(0, 1)(task_table.task_cluster.random_engine);
            inject_fault = value < copy_fault_probability;
1405 1406
        }

N
Nikita Mikhaylov 已提交
1407
        // Select all fields
N
Nikita Mikhaylov 已提交
1408
        ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
1409

N
Nikita Mikhaylov 已提交
1410
        LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription()
N
Nikita Mikhaylov 已提交
1411
                                                               << " : " << queryToString(query_select_ast));
N
Nikita Mikhaylov 已提交
1412 1413

        ASTPtr query_insert_ast;
1414
        {
N
Nikita Mikhaylov 已提交
1415
            String query;
N
Nikita Mikhaylov 已提交
1416
            query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
1417

N
Nikita Mikhaylov 已提交
1418
            ParserQuery p_query(query.data() + query.size());
1419 1420
            const auto & settings = context.getSettingsRef();
            query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1421 1422 1423 1424 1425 1426 1427 1428

            LOG_DEBUG(log, "Executing INSERT query: " << query);
        }

        try
        {
            /// Custom INSERT SELECT implementation
            Context context_select = context;
1429
            context_select.setSettings(task_cluster->settings_pull);
1430

N
Nikita Mikhaylov 已提交
1431
            Context context_insert = context;
1432
            context_insert.setSettings(task_cluster->settings_push);
1433

N
Nikita Mikhaylov 已提交
1434 1435
            BlockInputStreamPtr input;
            BlockOutputStreamPtr output;
1436
            {
N
Nikita Mikhaylov 已提交
1437 1438
                BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
                BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
1439

N
Nikita Mikhaylov 已提交
1440 1441
                input = io_select.in;
                output = io_insert.out;
1442 1443
            }

N
Nikita Mikhaylov 已提交
1444 1445 1446 1447 1448 1449 1450 1451
            /// Fail-fast optimization to abort copying when the current clean state expires
            std::future<Coordination::ExistsResponse> future_is_dirty_checker;

            Stopwatch watch(CLOCK_MONOTONIC_COARSE);
            constexpr UInt64 check_period_milliseconds = 500;

            /// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
            auto cancel_check = [&] ()
1452
            {
N
Nikita Mikhaylov 已提交
1453 1454
                if (zookeeper->expired())
                    throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1455

N
Nikita Mikhaylov 已提交
1456
                if (!future_is_dirty_checker.valid())
N
Nikita Mikhaylov 已提交
1457
                    future_is_dirty_checker = zookeeper->asyncExists(piece_is_dirty_flag_path);
1458

N
Nikita Mikhaylov 已提交
1459 1460 1461
                /// check_period_milliseconds should less than average insert time of single block
                /// Otherwise, the insertion will slow a little bit
                if (watch.elapsedMilliseconds() >= check_period_milliseconds)
1462
                {
N
Nikita Mikhaylov 已提交
1463
                    Coordination::ExistsResponse status = future_is_dirty_checker.get();
1464

N
Nikita Mikhaylov 已提交
1465
                    if (status.error != Coordination::ZNONODE)
1466
                    {
N
Nikita Mikhaylov 已提交
1467 1468 1469 1470
                        LogicalClock dirt_discovery_epoch (status.stat.mzxid);
                        if (dirt_discovery_epoch == clean_state_clock.discovery_zxid)
                            return false;
                        throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
1471 1472 1473
                    }
                }

N
Nikita Mikhaylov 已提交
1474 1475
                return false;
            };
1476

N
Nikita Mikhaylov 已提交
1477 1478 1479 1480 1481 1482 1483 1484
            /// Update statistics
            /// It is quite rough: bytes_copied don't take into account DROP PARTITION.
            auto update_stats = [&cluster_partition] (const Block & block)
            {
                cluster_partition.bytes_copied += block.bytes();
                cluster_partition.rows_copied += block.rows();
                cluster_partition.blocks_copied += 1;
            };
1485

N
Nikita Mikhaylov 已提交
1486 1487
            /// Main work is here
            copyData(*input, *output, cancel_check, update_stats);
1488

N
Nikita Mikhaylov 已提交
1489 1490 1491
            // Just in case
            if (future_is_dirty_checker.valid())
                future_is_dirty_checker.get();
1492

N
Nikita Mikhaylov 已提交
1493 1494 1495 1496 1497 1498
            if (inject_fault)
                throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
        }
        catch (...)
        {
            tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
N
better  
Nikita Mikhaylov 已提交
1499
            create_is_dirty_node(clean_state_clock);
N
Nikita Mikhaylov 已提交
1500
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1501
        }
1502 1503
    }

N
Nikita Mikhaylov 已提交
1504 1505 1506 1507 1508
    LOG_INFO(log, "Partition " << task_partition.name << " piece "
                               << toString(current_piece_number) << " copied. But not moved to original destination table.");


    /// Try create original table (if not exists) on each shard
N
Nikita Mikhaylov 已提交
1509
    try
N
Nikita Mikhaylov 已提交
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
    {
        auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
                                                               task_table.table_push, task_table.engine_push_ast);
        create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
        String query = queryToString(create_query_push_ast);

        LOG_DEBUG(log, "Create destination tables. Query: " << query);
        UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
                                              create_query_push_ast, &task_cluster->settings_push,
                                              PoolMode::GET_MANY);
        LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push)
                                             << " have been created on " << shards << " shards of " << task_table.cluster_push->getShardCount());
    }
N
Nikita Mikhaylov 已提交
1523 1524 1525 1526
    catch (...)
    {
        tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
    }
N
Nikita Mikhaylov 已提交
1527

N
Nikita Mikhaylov 已提交
1528 1529 1530
    /// Finalize the processing, change state of current partition task (and also check is_dirty flag)
    {
        String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
N
Nikita Mikhaylov 已提交
1531
        CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path);
N
Nikita Mikhaylov 已提交
1532 1533
        if (clean_state_clock != new_clean_state_clock)
        {
N
Nikita Mikhaylov 已提交
1534 1535
            LOG_INFO(log, "Partition " << task_partition.name << " piece "
                           << toString(current_piece_number) <<  " clean state changed, cowardly bailing");
N
Nikita Mikhaylov 已提交
1536
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1537 1538 1539
        }
        else if (!new_clean_state_clock.is_clean())
        {
N
Nikita Mikhaylov 已提交
1540 1541
            LOG_INFO(log, "Partition " << task_partition.name << " piece "
                           << toString(current_piece_number) << " became dirty and will be dropped and refilled");
N
Nikita Mikhaylov 已提交
1542
            create_is_dirty_node(new_clean_state_clock);
N
Nikita Mikhaylov 已提交
1543
            return TaskStatus::Error;
N
Nikita Mikhaylov 已提交
1544
        }
N
Nikita Mikhaylov 已提交
1545
        zookeeper->set(current_task_piece_status_path, state_finished, 0);
N
Nikita Mikhaylov 已提交
1546
    }
1547

N
Nikita Mikhaylov 已提交
1548
    return TaskStatus::Finished;
N
Nikita Mikhaylov 已提交
1549
}
1550

N
Nikita Mikhaylov 已提交
1551 1552 1553 1554
void ClusterCopier::dropAndCreateLocalTable(const ASTPtr & create_ast)
{
    const auto & create = create_ast->as<ASTCreateQuery &>();
    dropLocalTableIfExists({create.database, create.table});
1555

N
Nikita Mikhaylov 已提交
1556 1557 1558
    InterpreterCreateQuery interpreter(create_ast, context);
    interpreter.execute();
}
1559

N
Nikita Mikhaylov 已提交
1560 1561 1562 1563 1564 1565
void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_name) const
{
    auto drop_ast = std::make_shared<ASTDropQuery>();
    drop_ast->if_exists = true;
    drop_ast->database = table_name.first;
    drop_ast->table = table_name.second;
1566

N
Nikita Mikhaylov 已提交
1567 1568 1569
    InterpreterDropQuery interpreter(drop_ast, context);
    interpreter.execute();
}
1570

N
Nikita Mikhaylov 已提交
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597

void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
    LOG_DEBUG(log, "Removing helping tables");
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

        String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);

        const ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;

        LOG_DEBUG(log, "Execute distributed DROP TABLE: " << query);
        /// We have to drop partition_piece on each replica
        UInt64 num_nodes = executeQueryOnCluster(
                cluster_push, query,
                nullptr,
                &settings_push,
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);

        LOG_DEBUG(log, "DROP TABLE query was successfully executed on " << toString(num_nodes) << " nodes.");
    }
}

N
Nikita Mikhaylov 已提交
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625

void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
    LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
    for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
    {
        DatabaseAndTableName original_table = task_table.table_push;
        DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));

        String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;

        const ClusterPtr & cluster_push = task_table.cluster_push;
        Settings settings_push = task_cluster->settings_push;

        LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
        /// We have to drop partition_piece on each replica
        UInt64 num_nodes = executeQueryOnCluster(
                cluster_push, query,
                nullptr,
                &settings_push,
                PoolMode::GET_MANY,
                ClusterExecutionMode::ON_EACH_NODE);

        LOG_DEBUG(log, "DROP PARTITION query was successfully executed on " << toString(num_nodes) << " nodes.");
    }
    LOG_DEBUG(log, "All helping tables dropped partition " << partition_name);
}

N
Nikita Mikhaylov 已提交
1626 1627 1628 1629
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings)
{
    String query = "SHOW CREATE TABLE " + getQuotedTable(table);
    Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
N
Nikita Mikhaylov 已提交
1630
            connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
1631

N
Nikita Mikhaylov 已提交
1632 1633
    return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
1634

N
Nikita Mikhaylov 已提交
1635 1636 1637
ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
    /// Fetch and parse (possibly) new definition
1638
    auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull, true);
N
Nikita Mikhaylov 已提交
1639
    String create_query_pull_str = getRemoteCreateTable(
N
Nikita Mikhaylov 已提交
1640 1641 1642
            task_shard.task_table.table_pull,
            *connection_entry,
            &task_cluster->settings_pull);
N
Nikita Mikhaylov 已提交
1643 1644

    ParserCreateQuery parser_create_query;
1645 1646
    const auto & settings = context.getSettingsRef();
    return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1647
}
1648

N
Nikita Mikhaylov 已提交
1649
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
N
Nikita Mikhaylov 已提交
1650
void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
N
Nikita Mikhaylov 已提交
1651
        TaskShard & task_shard, bool create_split)
1652
{
N
Nikita Mikhaylov 已提交
1653
    TaskTable & task_table = task_shard.task_table;
1654

N
Nikita Mikhaylov 已提交
1655 1656
    /// We need to update table definitions for each part, it could be changed after ALTER
    task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
1657

N
Nikita Mikhaylov 已提交
1658 1659 1660 1661 1662
    /// Create local Distributed tables:
    ///  a table fetching data from current shard and a table inserting data to the whole destination cluster
    String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + ".";
    String split_shard_prefix = ".split.";
    task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id);
N
Nikita Mikhaylov 已提交
1663 1664 1665 1666 1667 1668 1669
    task_shard.main_table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id);

    for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
    {
        task_shard.list_of_split_tables_on_shard[piece_number] =
                DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id + "_piece_" + toString(piece_number));
    }
1670

N
Nikita Mikhaylov 已提交
1671 1672 1673 1674
    /// Create special cluster with single shard
    String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
    ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
    context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
1675

N
Nikita Mikhaylov 已提交
1676
    auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
1677

N
Nikita Mikhaylov 已提交
1678
    auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
1679

N
Nikita Mikhaylov 已提交
1680
    auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
N
Nikita Mikhaylov 已提交
1681
    dropAndCreateLocalTable(create_table_pull_ast);
1682

N
Nikita Mikhaylov 已提交
1683
    if (create_split)
N
Nikita Mikhaylov 已提交
1684 1685 1686 1687 1688 1689
    {
        auto create_table_split_piece_ast = rewriteCreateQueryStorage(
                create_query_ast,
                task_shard.main_table_split_shard,
                task_table.main_engine_split_ast);

N
Nikita Mikhaylov 已提交
1690
        dropAndCreateLocalTable(create_table_split_piece_ast);
N
Nikita Mikhaylov 已提交
1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705

        /// Create auxilary split tables for each piece
        for (const auto & piece_number : ext::range(0, task_table.number_of_splits))
        {
            const auto & storage_piece_split_ast = task_table.auxiliary_engine_split_asts[piece_number];

            create_table_split_piece_ast = rewriteCreateQueryStorage(
                    create_query_ast,
                    task_shard.list_of_split_tables_on_shard[piece_number],
                    storage_piece_split_ast);

            dropAndCreateLocalTable(create_table_split_piece_ast);
        }
    }

1706
}
1707 1708


N
Nikita Mikhaylov 已提交
1709
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
1710
{
N
Nikita Mikhaylov 已提交
1711
    createShardInternalTables(timeouts, task_shard, false);
1712

N
Nikita Mikhaylov 已提交
1713
    TaskTable & task_table = task_shard.task_table;
1714

N
Nikita Mikhaylov 已提交
1715 1716 1717 1718 1719 1720 1721
    String query;
    {
        WriteBufferFromOwnString wb;
        wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
           << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
        query = wb.str();
    }
1722

N
Nikita Mikhaylov 已提交
1723
    ParserQuery parser_query(query.data() + query.size());
1724 1725
    const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747

    LOG_DEBUG(log, "Computing destination partition set, executing query: " << query);

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
    Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);

    std::set<String> res;
    if (block)
    {
        ColumnWithTypeAndName & column = block.getByPosition(0);
        task_shard.partition_key_column = column;

        for (size_t i = 0; i < column.column->size(); ++i)
        {
            WriteBufferFromOwnString wb;
            column.type->serializeAsTextQuoted(*column.column, i, wb, FormatSettings());
            res.emplace(wb.str());
        }
    }

    LOG_DEBUG(log, "There are " << res.size() << " destination partitions in shard " << task_shard.getDescription());
1748

N
Nikita Mikhaylov 已提交
1749 1750
    return res;
}
1751

N
Nikita Mikhaylov 已提交
1752 1753
bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
        TaskShard & task_shard, const String & partition_quoted_name)
1754
{
N
Nikita Mikhaylov 已提交
1755 1756 1757 1758 1759
    createShardInternalTables(timeouts, task_shard, false);

    TaskTable & task_table = task_shard.task_table;

    std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
N
Nikita Mikhaylov 已提交
1760 1761
                        + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) +
                        " = (" + partition_quoted_name + " AS partition_key))";
N
Nikita Mikhaylov 已提交
1762 1763 1764 1765 1766 1767 1768

    if (!task_table.where_condition_str.empty())
        query += " AND (" + task_table.where_condition_str + ")";

    query += " LIMIT 1";

    LOG_DEBUG(log, "Checking shard " << task_shard.getDescription() << " for partition "
N
Nikita Mikhaylov 已提交
1769
                                     << partition_quoted_name << " existence, executing query: " << query);
N
Nikita Mikhaylov 已提交
1770 1771

    ParserQuery parser_query(query.data() + query.size());
1772 1773
const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1774 1775 1776 1777

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
    return InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows() != 0;
1778
}
1779

N
Nikita Mikhaylov 已提交
1780
bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
N
Nikita Mikhaylov 已提交
1781
                           TaskShard & task_shard, const String & partition_quoted_name, size_t current_piece_number)
N
Nikita Mikhaylov 已提交
1782 1783 1784 1785 1786 1787 1788
{
    createShardInternalTables(timeouts, task_shard, false);

    TaskTable & task_table = task_shard.task_table;
    const size_t number_of_splits = task_table.number_of_splits;
    const String & primary_key_comma_separated = task_table.primary_key_comma_separated;

N
Nikita Mikhaylov 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
    UNUSED(primary_key_comma_separated);

    std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard);

    if (experimental_use_sample_offset)
        query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);

    query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast)
                        + " = (" + partition_quoted_name + " AS partition_key))";

    if (!experimental_use_sample_offset)
        query += " AND (cityHash64(" + primary_key_comma_separated + ") % "
                 + std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )";
N
Nikita Mikhaylov 已提交
1802 1803 1804 1805 1806 1807 1808

    if (!task_table.where_condition_str.empty())
        query += " AND (" + task_table.where_condition_str + ")";

    query += " LIMIT 1";

    LOG_DEBUG(log, "Checking shard " << task_shard.getDescription() << " for partition "
N
Nikita Mikhaylov 已提交
1809 1810
                   << partition_quoted_name << " piece " << std::to_string(current_piece_number)
                   << "existence, executing query: " << query);
N
Nikita Mikhaylov 已提交
1811 1812

    ParserQuery parser_query(query.data() + query.size());
1813 1814
    const auto & settings = context.getSettingsRef();
    ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
N
Nikita Mikhaylov 已提交
1815 1816 1817 1818 1819 1820

    Context local_context = context;
    local_context.setSettings(task_cluster->settings_pull);
    auto result = InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows();
    if (result != 0)
        LOG_DEBUG(log, "Partition " << partition_quoted_name << " piece number "
N
Nikita Mikhaylov 已提交
1821
                       << std::to_string(current_piece_number) << " is PRESENT on shard " << task_shard.getDescription());
N
Nikita Mikhaylov 已提交
1822 1823
    else
        LOG_DEBUG(log, "Partition " << partition_quoted_name << " piece number "
N
Nikita Mikhaylov 已提交
1824
                       << std::to_string(current_piece_number) << " is ABSENT on shard " << task_shard.getDescription());
N
Nikita Mikhaylov 已提交
1825 1826
    return result != 0;
}
1827

N
Nikita Mikhaylov 已提交
1828 1829 1830
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
  * Returns number of shards for which at least one replica executed query successfully
  */
N
Nikita Mikhaylov 已提交
1831
UInt64 ClusterCopier::executeQueryOnCluster(
N
Nikita Mikhaylov 已提交
1832 1833
        const ClusterPtr & cluster,
        const String & query,
N
Nikita Mikhaylov 已提交
1834 1835 1836
        const ASTPtr & query_ast_,
        const Settings * settings,
        PoolMode pool_mode,
N
Nikita Mikhaylov 已提交
1837
        ClusterExecutionMode execution_mode,
N
Nikita Mikhaylov 已提交
1838
        UInt64 max_successful_executions_per_shard) const
1839
{
N
Nikita Mikhaylov 已提交
1840 1841
    auto num_shards = cluster->getShardsInfo().size();
    std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
1842

N
Nikita Mikhaylov 已提交
1843 1844
    ASTPtr query_ast;
    if (query_ast_ == nullptr)
1845
    {
N
Nikita Mikhaylov 已提交
1846
        ParserQuery p_query(query.data() + query.size());
1847 1848
        const auto & settings = context.getSettingsRef();
        query_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
1849
    }
N
Nikita Mikhaylov 已提交
1850 1851 1852
    else
        query_ast = query_ast_;

N
Nikita Mikhaylov 已提交
1853 1854 1855 1856 1857
    /// We will have to execute query on each replica of a shard.
    if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
        max_successful_executions_per_shard = 0;

    std::atomic<size_t> origin_replicas_number;
N
Nikita Mikhaylov 已提交
1858 1859 1860

    /// We need to execute query on one replica at least
    auto do_for_shard = [&] (UInt64 shard_index)
1861
    {
N
Nikita Mikhaylov 已提交
1862 1863 1864
        const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
        UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
        num_successful_executions = 0;
1865

N
Nikita Mikhaylov 已提交
1866
        auto increment_and_check_exit = [&] () -> bool
N
Nikita Mikhaylov 已提交
1867 1868 1869 1870
        {
            ++num_successful_executions;
            return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
        };
1871

N
Nikita Mikhaylov 已提交
1872
        UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
N
Nikita Mikhaylov 已提交
1873 1874

        origin_replicas_number += num_replicas;
N
Nikita Mikhaylov 已提交
1875 1876
        UInt64 num_local_replicas = shard.getLocalNodeCount();
        UInt64 num_remote_replicas = num_replicas - num_local_replicas;
1877

N
Nikita Mikhaylov 已提交
1878 1879 1880 1881 1882
        /// In that case we don't have local replicas, but do it just in case
        for (UInt64 i = 0; i < num_local_replicas; ++i)
        {
            auto interpreter = InterpreterFactory::get(query_ast, context);
            interpreter->execute();
1883

N
Nikita Mikhaylov 已提交
1884 1885 1886
            if (increment_and_check_exit())
                return;
        }
1887

N
Nikita Mikhaylov 已提交
1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
        /// Will try to make as many as possible queries
        if (shard.hasRemoteConnections())
        {
            Settings current_settings = settings ? *settings : task_cluster->settings_common;
            current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;

            auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);
            auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode);

            for (auto & connection : connections)
            {
                if (connection.isNull())
                    continue;

                try
                {
                    /// CREATE TABLE and DROP PARTITION queries return empty block
                    RemoteBlockInputStream stream{*connection, query, Block{}, context, &current_settings};
                    NullBlockOutputStream output{Block{}};
                    copyData(stream, output);

                    if (increment_and_check_exit())
                        return;
                }
                catch (const Exception &)
                {
                    LOG_INFO(log, getCurrentExceptionMessage(false, true));
                }
            }
        }
    };
1919 1920

    {
N
Nikita Mikhaylov 已提交
1921 1922 1923 1924
        ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));

        for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
            thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
1925

N
Nikita Mikhaylov 已提交
1926
        thread_pool.wait();
1927
    }
N
Nikita Mikhaylov 已提交
1928

N
Nikita Mikhaylov 已提交
1929
    UInt64 successful_nodes = 0;
N
Nikita Mikhaylov 已提交
1930
    for (UInt64 num_replicas : per_shard_num_successful_replicas)
N
Nikita Mikhaylov 已提交
1931 1932 1933 1934 1935 1936 1937
    {
        if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
            successful_nodes += num_replicas;
        else
            /// Count only successful shards
            successful_nodes += (num_replicas > 0);
    }
N
Nikita Mikhaylov 已提交
1938

N
Nikita Mikhaylov 已提交
1939
    if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
N
Nikita Mikhaylov 已提交
1940 1941 1942 1943
    {
        LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on "
                << toString(successful_nodes) << " nodes. But had to be executed on " << toString(origin_replicas_number.load()));
    }
N
Nikita Mikhaylov 已提交
1944

N
Nikita Mikhaylov 已提交
1945

N
Nikita Mikhaylov 已提交
1946
    return successful_nodes;
N
Nikita Mikhaylov 已提交
1947
}
1948
}