DDLWorker.cpp 12.0 KB
Newer Older
1
#include <Interpreters/DDLWorker.h>
2 3 4 5 6 7

#include <Parsers/parseQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTAlterQuery.h>

8 9 10 11 12
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

13
#include <Interpreters/executeQuery.h>
14 15 16 17 18 19 20 21 22 23 24
#include <Interpreters/Cluster.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/getFQDNOrHostName.h>

#include <Core/Block.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
25 26 27 28 29 30 31 32 33 34

#include <zkutil/ZooKeeper.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int UNKNOWN_ELEMENT_IN_CONFIG;
    extern const int INVALID_CONFIG_PARAMETER;
35
    extern const int UNKNOWN_FORMAT_VERSION;
36 37 38
}


39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
struct DDLLogEntry
{
    String query;
    Strings hosts;
    String initiator;

    static constexpr char CURRENT_VERSION = '1';

    String toString()
    {
        String res;
        {
            WriteBufferFromString wb(res);

            writeChar(CURRENT_VERSION, wb);
            wb << "\n";
55 56 57
            wb << "query: " << query << "\n";
            wb << "hosts: " << hosts << "\n";
            wb << "initiator: " << initiator << "\n";
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
        }

        return res;
    }

    void parse(const String & data)
    {
        ReadBufferFromString rb(data);

        char version;
        readChar(version, rb);
        if (version != CURRENT_VERSION)
            throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION);

        rb >> "\n";
73 74 75
        rb >> "query: " >> query >> "\n";
        rb >> "hosts: " >> hosts >> "\n";
        rb >> "initiator: " >> initiator >> "\n";
76 77 78 79

        assertEOF(rb);
    }
};
80 81


82 83 84 85 86 87
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
    : context(context_), stop_flag(false)
{
    root_dir = zk_root_dir;
    if (root_dir.back() == '/')
        root_dir.resize(root_dir.size() - 1);
88

89
    hostname = getFQDNOrHostName() + ':' + DB::toString(context.getTCPPort());
90

91 92 93
    zookeeper = context.getZooKeeper();
    event_queue_updated = std::make_shared<Poco::Event>();

94 95 96
    thread = std::thread(&DDLWorker::run, this);
}

97

98 99 100
DDLWorker::~DDLWorker()
{
    stop_flag = true;
101 102
    //cond_var.notify_one();
    event_queue_updated->set();
103 104 105
    thread.join();
}

106

107 108
void DDLWorker::processTasks()
{
109 110
    LOG_DEBUG(log, "processTasks");
    zookeeper->createAncestors(root_dir + "/");
111

112 113
    Strings queue_nodes = zookeeper->getChildren(root_dir, nullptr, event_queue_updated);
    if (queue_nodes.empty())
114 115
        return;

116 117
    LOG_DEBUG(log, "fetched " << queue_nodes.size() << " nodes");

118
    bool server_startup = last_processed_node_name.empty();
119

120 121 122 123
    std::sort(queue_nodes.begin(), queue_nodes.end());
    auto begin_node = server_startup
        ? queue_nodes.begin()
        : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name);
124

125
    for (auto it = begin_node; it != queue_nodes.end(); ++it)
126
    {
127
        String node_data, node_name = *it, node_path = root_dir + "/" + node_name;
128 129 130 131 132 133
        LOG_DEBUG(log, "Fetching node " << node_path);
        if (!zookeeper->tryGet(node_path, node_data))
        {
            /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
            continue;
        }
134

135
        LOG_DEBUG(log, "Fetched data for node " << node_name);
136 137 138 139 140 141

        DDLLogEntry node;
        node.parse(node_data);

        bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), hostname) != node.hosts.cend();

142 143 144 145
        bool already_processed = zookeeper->exists(node_path + "/failed/" + hostname)
                                 || zookeeper->exists(node_path + "/sucess/" + hostname);

        LOG_DEBUG(log, "node " << node_name << ", " << node.query << " status: " << host_in_hostlist << " " << already_processed);
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

        if (!server_startup && already_processed)
        {
            throw Exception(
                "Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
                ErrorCodes::LOGICAL_ERROR);
        }

        if (host_in_hostlist && !already_processed)
        {
            try
            {
                processTask(node, node_name);
            }
            catch (...)
            {
                /// It could be network error, but we mark node as processed anyway.
                last_processed_node_name = node_name;

                tryLogCurrentException(log,
                    "An unexpected error occurred during processing DLL query " + node.query + " (" + node_name + ")");
                throw;
            }
        }

        last_processed_node_name = node_name;
172
    }
173 174
}

175

176 177
/// Try to create unexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
178
{
179
    auto acl = zookeeper->getDefaultACL();
180

181 182 183 184
    zkutil::Ops ops;
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/sucess", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/failed", "", acl, zkutil::CreateMode::Persistent));
185

186 187 188 189 190
    int code = zookeeper->tryMulti(ops);

    if (code != ZOK && code != ZNODEEXISTS)
        throw zkutil::KeeperException(code);
}
191 192


193 194
bool DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
{
195 196
    LOG_DEBUG(log, "Process " << node_name << " node, query " << node.query);

197 198 199
    String node_path = root_dir + "/" + node_name;
    createStatusDirs(node_path);

200 201
    LOG_DEBUG(log, "Created status dirs");

202 203
    String active_flag_path = node_path + "/active/" + hostname;
    zookeeper->create(active_flag_path, "", zkutil::CreateMode::Ephemeral);
204

205 206
    LOG_DEBUG(log, "Created active flag");

207
    /// At the end we will delete active flag and ...
208
    zkutil::Ops ops;
209 210
    auto acl = zookeeper->getDefaultACL();
    ops.emplace_back(std::make_unique<zkutil::Op::Remove>(active_flag_path, -1));
211

212 213
    LOG_DEBUG(log, "Executing query: " << node.query);

214 215
    try
    {
216
        executeQuery(node.query, context);
217 218 219 220
    }
    catch (...)
    {
        /// ... and create fail flag
221
        String fail_flag_path = node_path + "/failed/" + hostname;
222
        String exception_msg = getCurrentExceptionMessage(false, true);
223 224

        ops.emplace_back(std::make_unique<zkutil::Op::Create>(fail_flag_path, exception_msg, acl, zkutil::CreateMode::Persistent));
225 226
        zookeeper->multi(ops);

227
        tryLogCurrentException(log, "Query " + node.query + " wasn't finished successfully");
228 229 230
        return false;
    }

231 232
    LOG_DEBUG(log, "Executed query: " << node.query);

233 234 235
    /// ... and create sucess flag
    String fail_flag_path = node_path + "/sucess/" + hostname;
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(fail_flag_path, "", acl, zkutil::CreateMode::Persistent));
236 237
    zookeeper->multi(ops);

238 239
    LOG_DEBUG(log, "Removed flags");

240 241 242 243
    return true;
}


244
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
245
{
246
    if (entry.hosts.empty())
247
        return {};
248

249
    String query_path_prefix = root_dir + "/query-";
250
    zookeeper->createAncestors(query_path_prefix);
251

252 253
    String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
    createStatusDirs(node_path);
254 255

    return node_path;
256 257
}

258

259 260 261 262
void DDLWorker::run()
{
    using namespace std::chrono_literals;

263 264
    LOG_DEBUG(log, "Started DDLWorker thread");

265 266
    while (!stop_flag)
    {
267 268
        LOG_DEBUG(log, "Begin tasks processing");

269 270 271 272
        try
        {
            processTasks();
        }
273
        catch (...)
274
        {
275
            tryLogCurrentException(log);
276 277
        }

278 279 280
        //std::unique_lock<std::mutex> g(lock);
        //cond_var.wait_for(g, 10s);

281
        LOG_DEBUG(log, "Waiting watch");
282
        event_queue_updated->wait();
283 284 285
    }
}

286

287
class DDLQueryStatusInputSream : public IProfilingBlockInputStream
288
{
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
public:
    DDLQueryStatusInputSream(const String & zk_node_path, Context & context, size_t num_hosts)
    : node_path(zk_node_path), context(context)
    {
        sample = Block{
            {std::make_shared<DataTypeString>(),    "host"},
            {std::make_shared<DataTypeUInt64>(),    "status"},
            {std::make_shared<DataTypeString>(),    "error"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_remaining"},
            {std::make_shared<DataTypeUInt64>(),    "num_hosts_active"},
        };

        setTotalRowsApprox(num_hosts);
    }

    String getName() const override
    {
        return "DDLQueryStatusInputSream";
    }

    String getID() const override
    {
        return "DDLQueryStatusInputSream(" + node_path + ")";
    }

    Block readImpl() override
    {
        Block res;
        if (num_hosts_finished >= total_rows_approx)
            return res;

        auto zookeeper = context.getZooKeeper();
        size_t try_number = 0;

        while(res.rows() == 0)
        {
            if (is_cancelled)
                return res;

            if (num_hosts_finished != 0 || try_number != 0)
                std::this_thread::sleep_for(std::chrono::milliseconds(100 * std::min(100LU, try_number + 1)));

            /// The node was deleted while we was sleeping.
            if (!zookeeper->exists(node_path))
                return res;

            Strings active_hosts = zookeeper->getChildren(node_path + "/active");
            Strings new_sucess_hosts = getDiffAndUpdate(sucess_hosts_set, zookeeper->getChildren(node_path + "/sucess"));
            Strings new_failed_hosts = getDiffAndUpdate(failed_hosts_set, zookeeper->getChildren(node_path + "/failed"));

            Strings new_hosts = new_sucess_hosts;
            new_hosts.insert(new_hosts.end(), new_failed_hosts.cbegin(), new_failed_hosts.cend());
            ++try_number;

            if (new_hosts.empty())
                continue;

            res = sample.cloneEmpty();
            for (size_t i = 0; i < new_hosts.size(); ++i)
            {
                bool fail = i >= new_sucess_hosts.size();
                res.getByName("host").column->insert(new_hosts[i]);
                res.getByName("status").column->insert(static_cast<UInt64>(fail));
                res.getByName("error").column->insert(fail ? zookeeper->get(node_path + "/failed/" + new_hosts[i]) : String{});
                res.getByName("num_hosts_remaining").column->insert(total_rows_approx - (++num_hosts_finished));
                res.getByName("num_hosts_active").column->insert(active_hosts.size());
            }
        }
357

358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
        return res;
    }

    static Strings getDiffAndUpdate(NameSet & prev, const Strings & cur_list)
    {
        Strings diff;
        for (const String & elem : cur_list)
        {
            if (!prev.count(elem))
                diff.emplace_back(elem);
        }

        prev.insert(diff.cbegin(), diff.cend());
        return diff;
    }

    ~DDLQueryStatusInputSream() override = default;

    Block sample;

private:
    String node_path;
    Context & context;

    NameSet sucess_hosts_set;
    NameSet failed_hosts_set;
    size_t num_hosts_finished = 0;
385
};
386 387 388 389 390


BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context)
{
    ClusterPtr cluster = context.getCluster(cluster_name);
391 392 393 394 395 396
    DDLWorker & ddl_worker = context.getDDLWorker();

    DDLLogEntry entry;
    entry.query = query;
    entry.initiator = ddl_worker.getHostName();

397
    Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
398 399
    for (const auto & shard : shards)
        for (const auto & addr : shard)
400
            entry.hosts.emplace_back(addr.toString());
401

402
    String node_path = ddl_worker.enqueueQuery(entry);
403 404

    BlockIO io;
405 406 407 408 409 410
    if (node_path.empty())
        return io;

    auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, context, entry.hosts.size());
    io.in_sample = stream->sample.cloneEmpty();
    io.in = std::move(stream);
411 412 413 414
    return io;
}


415
}