DDLWorker.h 1.6 KB
Newer Older
1 2
#pragma once
#include <Interpreters/Context.h>
3 4
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
5 6 7 8 9 10 11 12 13 14 15
#include <common/logger_useful.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

namespace DB
{

16 17 18 19

BlockIO executeDDLQueryOnCluster(const String & query, const String & cluster_name, Context & context);


20 21 22
class DDLWorker
{
public:
23
    DDLWorker(const std::string & zk_root_dir, Context & context_);
24 25
    ~DDLWorker();

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    void enqueueQuery(const String & query, const std::vector<Cluster::Address> & addrs);

    /// Returns root/ path in ZooKeeper
    std::string getRoot() const
    {
        return root_dir;
    }

    std::string getAssignsDir() const
    {
        return root_dir + "/assigns";
    }

    std::string getMastersDir() const
    {
        return root_dir + "/masters";
    }

    std::string getCurrentMasterDir() const
    {
        return getMastersDir() + "/" + getHostName();
    }

    std::string getHostName() const
    {
        return hostname;
    }

54 55
private:
    void processTasks();
56 57 58 59
    bool processTask(const std::string & task);

    void processQueries();
    bool processQuery(const std::string & task);
60 61 62 63 64 65 66

    void run();

private:
    Context & context;
    Logger * log = &Logger::get("DDLWorker");

67 68 69 70
    std::string hostname;
    std::string root_dir;       /// common dir with queue of queries
    std::string assign_dir;     /// dir with tasks assigned to the server
    std::string master_dir;    /// dir with queries was initiated by the server
71 72 73 74 75 76 77 78

    std::atomic<bool> stop_flag;
    std::condition_variable cond_var;
    std::mutex lock;
    std::thread thread;
};

}