DDLWorker.cpp 9.7 KB
Newer Older
1
#include <Interpreters/DDLWorker.h>
2 3 4 5 6 7

#include <Parsers/parseQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTAlterQuery.h>

8 9 10 11 12
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

13
#include <Interpreters/executeQuery.h>
14 15 16 17 18 19 20 21 22 23 24
#include <Interpreters/Cluster.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/getFQDNOrHostName.h>

#include <Core/Block.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
25 26 27 28 29 30 31 32 33 34

#include <zkutil/ZooKeeper.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int UNKNOWN_ELEMENT_IN_CONFIG;
    extern const int INVALID_CONFIG_PARAMETER;
35
    extern const int UNKNOWN_FORMAT_VERSION;
36 37 38
}


39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
struct DDLLogEntry
{
    String query;
    Strings hosts;
    String initiator;

    static constexpr char CURRENT_VERSION = '1';

    String toString()
    {
        String res;
        {
            WriteBufferFromString wb(res);

            writeChar(CURRENT_VERSION, wb);
            wb << "\n";
            wb << "query: " << double_quote << query << "\n";
            wb << "hosts: " << double_quote << hosts << "\n";
            wb << "initiator: " << double_quote << initiator << "\n";
        }

        return res;
    }

    void parse(const String & data)
    {
        ReadBufferFromString rb(data);

        char version;
        readChar(version, rb);
        if (version != CURRENT_VERSION)
            throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION);

        rb >> "\n";
        rb >> "query: " >> double_quote >> query >> "\n";
        rb >> "hosts: " >> double_quote >> hosts >> "\n";
        rb >> "initiator: " >> double_quote >> initiator >> "\n";

        assertEOF(rb);
    }
};
80 81


82 83 84 85 86 87
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
    : context(context_), stop_flag(false)
{
    root_dir = zk_root_dir;
    if (root_dir.back() == '/')
        root_dir.resize(root_dir.size() - 1);
88

89 90
    hostname = getFQDNOrHostName() + ':' + DB::toString(context.getTCPPort());
    master_dir = getMastersDir() + hostname;
91

92 93 94
    zookeeper = context.getZooKeeper();
    event_queue_updated = std::make_shared<Poco::Event>();

95 96 97
    thread = std::thread(&DDLWorker::run, this);
}

98

99 100 101 102 103 104 105
DDLWorker::~DDLWorker()
{
    stop_flag = true;
    cond_var.notify_one();
    thread.join();
}

106

107 108
void DDLWorker::processTasks()
{
109 110 111 112
    Strings queue_nodes;
    int code = zookeeper->tryGetChildren(root_dir, queue_nodes, nullptr, event_queue_updated);
    if (code != ZNONODE)
        throw zkutil::KeeperException(code);
113

114 115
    /// Threre are no tasks
    if (code == ZNONODE || queue_nodes.empty())
116 117
        return;

118
    bool server_startup = last_processed_node_name.empty();
119

120 121 122 123
    std::sort(queue_nodes.begin(), queue_nodes.end());
    auto begin_node = server_startup
        ? queue_nodes.begin()
        : std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_node_name);
124

125
    for (auto it = begin_node; it != queue_nodes.end(); ++it)
126
    {
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
        String node_data, node_name = *it, node_path = root_dir + "/" + node_name;
        code = zookeeper->tryGet(node_path, node_data);

        /// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
        if (code != ZNONODE)
            throw zkutil::KeeperException(code);

        DDLLogEntry node;
        node.parse(node_data);

        bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), hostname) != node.hosts.cend();

        bool already_processed = !zookeeper->exists(node_path + "/failed/" + hostname)
                                 && !zookeeper->exists(node_path + "/sucess/" + hostname);

        if (!server_startup && already_processed)
        {
            throw Exception(
                "Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
                ErrorCodes::LOGICAL_ERROR);
        }

        if (host_in_hostlist && !already_processed)
        {
            try
            {
                processTask(node, node_name);
            }
            catch (...)
            {
                /// It could be network error, but we mark node as processed anyway.
                last_processed_node_name = node_name;

                tryLogCurrentException(log,
                    "An unexpected error occurred during processing DLL query " + node.query + " (" + node_name + ")");
                throw;
            }
        }

        last_processed_node_name = node_name;
167
    }
168 169
}

170

171 172
/// Try to create unexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
173
{
174
    auto acl = zookeeper->getDefaultACL();
175

176 177 178 179
    zkutil::Ops ops;
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/sucess", "", acl, zkutil::CreateMode::Persistent));
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/failed", "", acl, zkutil::CreateMode::Persistent));
180

181 182 183 184 185
    int code = zookeeper->tryMulti(ops);

    if (code != ZOK && code != ZNODEEXISTS)
        throw zkutil::KeeperException(code);
}
186 187


188 189 190 191 192 193 194
bool DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
{
    String node_path = root_dir + "/" + node_name;
    createStatusDirs(node_path);

    String active_flag_path = node_path + "/active/" + hostname;
    zookeeper->create(active_flag_path, "", zkutil::CreateMode::Ephemeral);
195

196
    /// At the end we will delete active flag and ...
197
    zkutil::Ops ops;
198 199
    auto acl = zookeeper->getDefaultACL();
    ops.emplace_back(std::make_unique<zkutil::Op::Remove>(active_flag_path, -1));
200

201 202
    try
    {
203
        executeQuery(node.query, context);
204 205 206 207
    }
    catch (...)
    {
        /// ... and create fail flag
208
        String fail_flag_path = node_path + "/failed/" + hostname;
209
        String exception_msg = getCurrentExceptionMessage(false, true);
210 211

        ops.emplace_back(std::make_unique<zkutil::Op::Create>(fail_flag_path, exception_msg, acl, zkutil::CreateMode::Persistent));
212 213
        zookeeper->multi(ops);

214
        tryLogCurrentException(log, "Query " + node.query + " wasn't finished successfully");
215 216 217
        return false;
    }

218 219 220
    /// ... and create sucess flag
    String fail_flag_path = node_path + "/sucess/" + hostname;
    ops.emplace_back(std::make_unique<zkutil::Op::Create>(fail_flag_path, "", acl, zkutil::CreateMode::Persistent));
221 222 223 224 225 226
    zookeeper->multi(ops);

    return true;
}


227
void DDLWorker::enqueueQuery(DDLLogEntry & entry)
228
{
229 230
    if (entry.hosts.empty())
        return;
231 232

    String query_path_prefix = getRoot() + "/query-";
233
    zookeeper->createAncestors(query_path_prefix);
234

235 236
    String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
    createStatusDirs(node_path);
237 238
}

239

240 241 242 243 244 245 246 247 248 249
void DDLWorker::run()
{
    using namespace std::chrono_literals;

    while (!stop_flag)
    {
        try
        {
            processTasks();
        }
250
        catch (...)
251
        {
252
            tryLogCurrentException(log);
253 254
        }

255 256 257 258
        //std::unique_lock<std::mutex> g(lock);
        //cond_var.wait_for(g, 10s);

        event_queue_updated->wait();
259 260 261
    }
}

262

263
class DDLQueryStatusInputSream : IProfilingBlockInputStream
264 265
{

266
};
267 268 269 270 271 272 273 274 275 276 277


BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context)
{
    ClusterPtr cluster = context.getCluster(cluster_name);
    Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();

    Array hosts_names_failed, hosts_errors, hosts_names_pending;
    size_t num_hosts_total = 0;
    size_t num_hosts_finished_successfully = 0;

278 279 280 281 282 283
    DDLWorker & ddl_worker = context.getDDLWorker();

    DDLLogEntry entry;
    entry.query = query;
    entry.initiator = ddl_worker.getHostName();

284 285
    for (const auto & shard : shards)
        for (const auto & addr : shard)
286
            entry.hosts.emplace_back(addr.toString());
287

288
    ddl_worker.enqueueQuery(entry);
289 290 291 292 293 294 295 296 297 298 299 300 301


    auto aray_of_strings = std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
    Block block{
        {std::make_shared<DataTypeUInt64>(),    "num_hosts_total"},
        {std::make_shared<DataTypeUInt64>(),    "num_hosts_finished_successfully"},
        {std::make_shared<DataTypeUInt64>(),    "num_hosts_finished_unsuccessfully"},
        {std::make_shared<DataTypeUInt64>(),    "num_hosts_pending"},
        {aray_of_strings->clone(),              "hosts_finished_unsuccessfully"},
        {aray_of_strings->clone(),              "hosts_finished_unsuccessfully_errors"},
        {aray_of_strings->clone(),              "hosts_pending"}
    };

302
    size_t num_hosts_finished = num_hosts_total;
303 304 305 306
    size_t num_hosts_finished_unsuccessfully = num_hosts_finished - num_hosts_finished_successfully;
    block.getByName("num_hosts_total").column->insert(num_hosts_total);
    block.getByName("num_hosts_finished_successfully").column->insert(num_hosts_finished_successfully);
    block.getByName("num_hosts_finished_unsuccessfully").column->insert(num_hosts_finished_unsuccessfully);
307
    block.getByName("num_hosts_pending").column->insert(0LU);
308 309 310 311 312 313 314 315 316 317 318
    block.getByName("hosts_finished_unsuccessfully").column->insert(hosts_names_failed);
    block.getByName("hosts_finished_unsuccessfully_errors").column->insert(hosts_errors);
    block.getByName("hosts_pending").column->insert(hosts_names_pending);

    BlockIO io;
    io.in_sample = block.cloneEmpty();
    io.in = std::make_shared<OneBlockInputStream>(block);
    return io;
}


319
}