DDLTask.h 5.3 KB
Newer Older
A
Alexander Tokmakov 已提交
1 2 3
#pragma once
#include <Core/Types.h>
#include <Interpreters/Cluster.h>
A
Alexander Tokmakov 已提交
4
#include <Common/ZooKeeper/Types.h>
A
Alexander Tokmakov 已提交
5

6 7 8 9
namespace Poco
{
class Logger;
}
A
Alexander Tokmakov 已提交
10

11 12 13 14 15
namespace zkutil
{
class ZooKeeper;
}

A
Alexander Tokmakov 已提交
16 17 18 19
namespace DB
{

class ASTQueryWithOnCluster;
A
Alexander Tokmakov 已提交
20
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
21
using ClusterPtr = std::shared_ptr<Cluster>;
22
class DatabaseReplicated;
A
Alexander Tokmakov 已提交
23

A
Alexander Tokmakov 已提交
24 25
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
A
Alexander Tokmakov 已提交
26

A
Alexander Tokmakov 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
struct HostID
{
    String host_name;
    UInt16 port;

    HostID() = default;

    explicit HostID(const Cluster::Address & address)
        : host_name(address.host_name), port(address.port) {}

    static HostID fromString(const String & host_port_str);

    String toString() const
    {
        return Cluster::Address::toString(host_name, port);
    }

    String readableString() const
    {
        return host_name + ":" + DB::toString(port);
    }

    bool isLocalAddress(UInt16 clickhouse_port) const;

    static String applyToString(const HostID & host_id)
    {
        return host_id.toString();
    }
};


struct DDLLogEntry
{
60
    UInt64 version = 1;
A
Alexander Tokmakov 已提交
61 62 63
    String query;
    std::vector<HostID> hosts;
    String initiator; // optional
64
    std::optional<SettingsChanges> settings;
A
Alexander Tokmakov 已提交
65

66
    void setSettingsIfRequired(const Context & context);
A
Alexander Tokmakov 已提交
67 68
    String toString() const;
    void parse(const String & data);
69
    void assertVersion() const;
A
Alexander Tokmakov 已提交
70 71
};

72 73 74 75 76 77 78 79 80 81
struct DDLTaskBase
{
    const String entry_name;
    const String entry_path;

    DDLLogEntry entry;

    String host_id_str;
    ASTPtr query;

82
    bool is_initial_query = false;
83 84
    bool is_circular_replicated = false;
    bool execute_on_leader = false;
A
Alexander Tokmakov 已提交
85

A
Alexander Tokmakov 已提交
86
    Coordination::Requests ops;
87 88 89
    ExecutionStatus execution_status;
    bool was_executed = false;

A
fix  
Alexander Tokmakov 已提交
90 91
    std::atomic_bool completely_processed = false;

A
Alexander Tokmakov 已提交
92
    DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {}
93
    DDLTaskBase(const DDLTaskBase &) = delete;
A
Alexander Tokmakov 已提交
94 95 96 97
    virtual ~DDLTaskBase() = default;

    void parseQueryFromEntry(const Context & context);

98 99
    virtual String getShardID() const = 0;

A
Alexander Tokmakov 已提交
100
    virtual std::unique_ptr<Context> makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper);
101 102 103 104 105

    inline String getActiveNodePath() const { return entry_path + "/active/" + host_id_str; }
    inline String getFinishedNodePath() const { return entry_path + "/finished/" + host_id_str; }
    inline String getShardNodePath() const { return entry_path + "/shards/" + getShardID(); }

A
Alexander Tokmakov 已提交
106 107
    static String getLogEntryName(UInt32 log_entry_number);
    static UInt32 getLogEntryNumber(const String & log_entry_name);
108 109 110
};

struct DDLTask : public DDLTaskBase
A
Alexander Tokmakov 已提交
111
{
112 113 114 115 116
    DDLTask(const String & name, const String & path) : DDLTaskBase(name, path) {}

    bool findCurrentHostID(const Context & global_context, Poco::Logger * log);

    void setClusterInfo(const Context & context, Poco::Logger * log);
A
Alexander Tokmakov 已提交
117

118 119 120 121 122 123 124
    String getShardID() const override;

private:
    bool tryFindHostInCluster();
    bool tryFindHostInClusterViaResolving(const Context & context);

    HostID host_id;
A
Alexander Tokmakov 已提交
125 126 127 128 129
    String cluster_name;
    ClusterPtr cluster;
    Cluster::Address address_in_cluster;
    size_t host_shard_num;
    size_t host_replica_num;
130
};
A
Alexander Tokmakov 已提交
131

132 133 134
struct DatabaseReplicatedTask : public DDLTaskBase
{
    DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_);
A
Alexander Tokmakov 已提交
135

136
    String getShardID() const override;
A
Alexander Tokmakov 已提交
137
    std::unique_ptr<Context> makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper) override;
A
Alexander Tokmakov 已提交
138

139
    DatabaseReplicated * database;
A
Alexander Tokmakov 已提交
140 141
};

A
Alexander Tokmakov 已提交
142 143 144 145 146 147 148 149
/// The main purpose of ZooKeeperMetadataTransaction is to execute all zookeeper operation related to query
/// in a single transaction when we performed all required checks and ready to "commit" changes.
/// For example, create ALTER_METADATA entry in ReplicatedMergeTree log,
/// create path/to/entry/finished/host_id node in distributed DDL queue to mark query as executed and
/// update metadata in path/to/replicated_database/metadata/table_name
/// It's used for DatabaseReplicated.
/// TODO we can also use it for ordinary ON CLUSTER queries
class ZooKeeperMetadataTransaction
A
Alexander Tokmakov 已提交
150
{
A
Alexander Tokmakov 已提交
151 152 153
    enum State
    {
        CREATED,
A
Alexander Tokmakov 已提交
154
        COMMITTED,
A
Alexander Tokmakov 已提交
155 156 157 158
        FAILED
    };

    State state = CREATED;
A
Alexander Tokmakov 已提交
159 160
    ZooKeeperPtr current_zookeeper;
    String zookeeper_path;
A
Alexander Tokmakov 已提交
161
    bool is_initial_query;
A
Alexander Tokmakov 已提交
162 163
    Coordination::Requests ops;

A
Alexander Tokmakov 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
public:
    ZooKeeperMetadataTransaction(const ZooKeeperPtr & current_zookeeper_, const String & zookeeper_path_, bool is_initial_query_)
    : current_zookeeper(current_zookeeper_)
    , zookeeper_path(zookeeper_path_)
    , is_initial_query(is_initial_query_)
    {
    }

    bool isInitialQuery() const { return is_initial_query; }

    bool isExecuted() const { return state != CREATED; }

    String getDatabaseZooKeeperPath() const { return zookeeper_path; }

    void addOp(Coordination::RequestPtr && op)
    {
        assert(!isExecuted());
        ops.emplace_back(op);
    }

A
Alexander Tokmakov 已提交
184
    void moveOpsTo(Coordination::Requests & other_ops)
A
Alexander Tokmakov 已提交
185
    {
A
Alexander Tokmakov 已提交
186
        assert(!isExecuted());
A
Alexander Tokmakov 已提交
187
        std::move(ops.begin(), ops.end(), std::back_inserter(other_ops));
A
fix  
Alexander Tokmakov 已提交
188
        ops.clear();
A
Alexander Tokmakov 已提交
189
        state = COMMITTED;
A
Alexander Tokmakov 已提交
190
    }
A
Alexander Tokmakov 已提交
191 192

    void commit();
A
Alexander Tokmakov 已提交
193

194
    ~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
A
Alexander Tokmakov 已提交
195 196
};

197 198
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name);

A
Alexander Tokmakov 已提交
199
}