DDLWorker.h 1.7 KB
Newer Older
1 2
#pragma once
#include <Interpreters/Context.h>
3 4
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
5 6 7 8 9 10 11 12 13 14 15
#include <common/logger_useful.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

namespace DB
{

16 17
class ASTQueryWithOnCluster;
BlockIO executeDDLQueryOnCluster(const ASTQueryWithOnCluster & query, Context & context);
18 19


20 21 22
struct DDLLogEntry;


23 24 25
class DDLWorker
{
public:
26
    DDLWorker(const std::string & zk_root_dir, Context & context_);
27 28
    ~DDLWorker();

29
    /// Pushes query into DDL queue, returns path to created node
30
    String enqueueQuery(DDLLogEntry & entry);
31 32 33 34 35 36

    std::string getHostName() const
    {
        return hostname;
    }

37 38
private:
    void processTasks();
39 40 41
    bool processTask(const DDLLogEntry & node, const std::string & node_path);

    void createStatusDirs(const std::string & node_name);
42

43 44
    /// Checks and cleanups queue's nodes
    void cleanupQueue(const Strings * node_names_to_check = nullptr);
45 46 47 48 49 50 51

    void run();

private:
    Context & context;
    Logger * log = &Logger::get("DDLWorker");

52 53
    std::string hostname;
    std::string root_dir;       /// common dir with queue of queries
54 55 56 57 58
    std::string master_dir;     /// dir with queries was initiated by the server

    std::string last_processed_node_name;

    std::shared_ptr<zkutil::ZooKeeper> zookeeper;
59
    std::shared_ptr<Poco::Event> event_queue_updated;
60 61 62 63 64

    std::atomic<bool> stop_flag;
    std::condition_variable cond_var;
    std::mutex lock;
    std::thread thread;
65

66 67 68 69
    size_t last_cleanup_time_seconds = 0;
    static constexpr size_t node_max_lifetime_seconds = 10; // 7 * 24 * 60 * 60;
    static constexpr size_t cleanup_after_seconds = 10;

70
    friend class DDLQueryStatusInputSream;
71 72 73
};

}