DDLWorker.h 3.0 KB
Newer Older
1 2
#pragma once
#include <Interpreters/Context.h>
3 4
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
5 6 7 8 9 10 11 12 13 14 15
#include <common/logger_useful.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

namespace DB
{

16
class ASTAlterQuery;
17
struct DDLLogEntry;
18
struct DDLTask;
19 20


21
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context);
22 23


24 25 26
class DDLWorker
{
public:
27
    DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix);
28 29
    ~DDLWorker();

30
    /// Pushes query into DDL queue, returns path to created node
31
    String enqueueQuery(DDLLogEntry & entry);
32

33
    /// Host ID (name:port) for logging purposes
34
    /// Note that in each task hosts are identified individually by name:port from initiator server cluster config
35
    std::string getCommonHostID() const
36
    {
37
        return host_fqdn_id;
38 39
    }

40 41
private:
    void processTasks();
42

43 44 45
    /// Reads entry and check that the host belongs to host list of the task
    /// Returns true and sets current_task if entry parsed and the check is passed
    bool initAndCheckTask(const String & entry_name);
46 47


48
    void processTask(DDLTask & task);
49 50

    void processTaskAlter(
51
        DDLTask & task,
52
        const ASTAlterQuery * ast_alter,
53 54
        const String & rewritten_query,
        const String & node_path);
55

56 57
    void parseQueryAndResolveHost(DDLTask & task);

58 59
    bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);

60

61
    /// Checks and cleanups queue's nodes
62
    void cleanupQueue();
63

64

65 66 67
    void createStatusDirs(const std::string & node_name);
    ASTPtr getRewrittenQuery(const DDLLogEntry & node);

68

69 70 71 72 73 74
    void run();

private:
    Context & context;
    Logger * log = &Logger::get("DDLWorker");

75 76
    std::string host_fqdn;      /// current host domain name
    std::string host_fqdn_id;   /// host_name:port
77

78
    std::string queue_dir;      /// dir with queue of queries
79 80
    std::string master_dir;     /// dir with queries was initiated by the server

81 82
    /// Last task that was skipped or sucesfully executed
    std::string last_processed_task_name;
83

84 85 86
    std::shared_ptr<zkutil::ZooKeeper> zookeeper;

    /// Save state of executed task to avoid duplicate execution on ZK error
87 88
    using DDLTaskPtr = std::unique_ptr<DDLTask>;
    DDLTaskPtr current_task;
89

90
    std::shared_ptr<Poco::Event> event_queue_updated;
91
    std::atomic<bool> stop_flag{false};
92
    std::thread thread;
93

94
    size_t last_cleanup_time_seconds = 0;
95 96

    /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
97
    size_t cleanup_delay_period = 60; // minute (in seconds)
98 99 100 101
    /// Delete node if its age is greater than that
    size_t task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
    /// How many tasks could be in the queue
    size_t max_tasks_in_queue = 1000;
102

103
    friend class DDLQueryStatusInputSream;
104
    friend class DDLTask;
105 106
};

107

108
}