DDLWorker.h 2.4 KB
Newer Older
1 2
#pragma once
#include <Interpreters/Context.h>
3 4
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
5 6 7 8 9 10 11 12 13 14 15
#include <common/logger_useful.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

namespace DB
{

16 17
struct ASTAlterQuery;
struct DDLLogEntry;
18 19


20
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context);
21 22


23 24 25
class DDLWorker
{
public:
26
    DDLWorker(const std::string & zk_root_dir, Context & context_);
27 28
    ~DDLWorker();

29
    /// Pushes query into DDL queue, returns path to created node
30
    String enqueueQuery(DDLLogEntry & entry);
31 32 33

    std::string getHostName() const
    {
34
        return host_id;
35 36
    }

37 38
private:
    void processTasks();
39 40 41 42 43 44 45 46 47

    void processTask(const DDLLogEntry & node, const std::string & node_path);

    void processTaskAlter(
        const ASTAlterQuery * query_alter,
        const String & rewritten_query,
        const std::shared_ptr<Cluster> & cluster,
        ssize_t shard_num,
        const String & node_path);
48

49 50
    /// Checks and cleanups queue's nodes
    void cleanupQueue(const Strings * node_names_to_check = nullptr);
51

52 53 54
    void createStatusDirs(const std::string & node_name);
    ASTPtr getRewrittenQuery(const DDLLogEntry & node);

55 56 57 58 59 60
    void run();

private:
    Context & context;
    Logger * log = &Logger::get("DDLWorker");

61 62 63 64
    std::string host_id;        /// host_name:port
    std::string host_name;
    UInt16 port;

65
    std::string queue_dir;      /// dir with queue of queries
66 67
    std::string master_dir;     /// dir with queries was initiated by the server

68
    /// Used to omit already processed nodes. Maybe usage of set is more obvious.
69 70
    std::string last_processed_node_name;

71 72 73 74 75 76
    std::shared_ptr<zkutil::ZooKeeper> zookeeper;

    /// Save state of executed task to avoid duplicate execution on ZK error
    std::string current_node = {};
    bool current_node_was_executed = false;
    ExecutionStatus current_node_execution_status;
77

78
    std::shared_ptr<Poco::Event> event_queue_updated;
79
    std::atomic<bool> stop_flag{false};
80
    std::thread thread;
81

82
    size_t last_cleanup_time_seconds = 0;
83 84 85 86 87

    /// Delete node if its age is greater than that
    static const size_t node_max_lifetime_seconds;
    /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
    static const size_t cleanup_min_period_seconds;
88

89
    friend class DDLQueryStatusInputSream;
90 91
};

92

93
}