ob_freeze_info_snapshot_mgr.h 13.0 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_STORAGE_FREEZE_INFO_SNAPSHOT_MGR_
#define OCEANBASE_STORAGE_FREEZE_INFO_SNAPSHOT_MGR_

#include <stdint.h>

#include "lib/allocator/ob_slice_alloc.h"
#include "lib/hash/ob_hashset.h"
#include "lib/lock/ob_tc_rwlock.h"
#include "lib/task/ob_timer.h"
#include "share/ob_freeze_info_proxy.h"
#include "share/ob_snapshot_table_proxy.h"
#include "share/ob_zone_info.h"

namespace oceanbase {

namespace common {
class ObISQLClient;
}

namespace storage {

struct ObFrozenStatus;

class ObFreezeInfoSnapshotMgr {
G
gm 已提交
37
public:
O
oceanbase-admin 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
  struct FreezeInfoLite {
    int64_t freeze_version;
    int64_t freeze_ts;
    int64_t cluster_version;

    FreezeInfoLite() : freeze_version(-1), freeze_ts(-1), cluster_version(0)
    {}
    FreezeInfoLite(const int64_t version, const int64_t ts, const int64_t cluster_ver)
        : freeze_version(version), freeze_ts(ts), cluster_version(cluster_ver)
    {}
    void reset()
    {
      freeze_version = -1;
      freeze_ts = -1;
      cluster_version = 0;
    }
    TO_STRING_KV(K(freeze_version), K(freeze_ts), K(cluster_version));
  };

  struct FreezeInfo {
    int64_t freeze_version;
    int64_t freeze_ts;
    int64_t schema_version;
    int64_t cluster_version;

    FreezeInfo() : freeze_version(-1), freeze_ts(-1), schema_version(-1), cluster_version(0)
    {}
M
mw0 已提交
65
    FreezeInfo &operator=(const FreezeInfoLite &o)
O
oceanbase-admin 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    {
      freeze_version = o.freeze_version;
      freeze_ts = o.freeze_ts;
      cluster_version = o.cluster_version;
      return *this;
    }
    void reset()
    {
      freeze_version = -1;
      freeze_ts = -1;
      schema_version = -1;
      cluster_version = 0;
    }
    TO_STRING_KV(K(freeze_version), K(freeze_ts), K(schema_version), K(cluster_version));
  };

  struct GCSnapshotInfo {
    int64_t snapshot_ts;
    int64_t schema_version;

    GCSnapshotInfo() : snapshot_ts(-1), schema_version(-1)
    {}
    GCSnapshotInfo(int64_t ts, int64_t version) : snapshot_ts(ts), schema_version(version)
    {}
    TO_STRING_KV(K(snapshot_ts), K(schema_version));
  };

  struct NeighbourFreezeInfoLite {
    FreezeInfoLite next;
    FreezeInfoLite prev;

    void reset()
    {
      next.reset();
      prev.reset();
    }
    TO_STRING_KV(K(next), K(prev));
  };

  struct NeighbourFreezeInfo {
    FreezeInfo next;
    FreezeInfoLite prev;

M
mw0 已提交
109
    NeighbourFreezeInfo &operator=(const NeighbourFreezeInfoLite &o)
O
oceanbase-admin 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
    {
      next = o.next;
      prev = o.prev;
      return *this;
    }
    void reset()
    {
      next.reset();
      prev.reset();
    }
    TO_STRING_KV(K(next), K(prev));
  };

  struct SchemaPair {
    uint64_t tenant_id;
    int64_t schema_version;

    SchemaPair() : tenant_id(common::OB_INVALID_ID), schema_version(common::OB_INVALID_VERSION)
    {}
    SchemaPair(uint64_t tenant, int64_t schema) : tenant_id(tenant), schema_version(schema)
    {}

    TO_STRING_KV(K(tenant_id), K(schema_version));
  };

M
mw0 已提交
135
  int init(common::ObISQLClient &sql_proxy, bool is_remote);
O
oceanbase-admin 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
  void init_for_test()
  {
    inited_ = true;
  }
  bool is_inited() const
  {
    return inited_;
  }
  int start();
  void wait();
  void stop();

  int64_t get_latest_frozen_timestamp();

  // no schema version is returned if you do not give the tenant id
M
mw0 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  int get_freeze_info_by_major_version(const int64_t major_version, FreezeInfoLite &freeze_info);
  int get_freeze_info_by_major_version(const uint64_t table_id, const int64_t major_version, FreezeInfo &freeze_info);
  int get_freeze_info_by_major_version(
      const int64_t major_version, FreezeInfoLite &freeze_info, bool &is_first_major_version);
  int get_freeze_info_behind_major_version(const int64_t major_version, common::ObIArray<FreezeInfoLite> &freeze_infos);

  int get_tenant_freeze_info_by_major_version(
      const uint64_t tenant_id, const int64_t major_version, FreezeInfo &freeze_info);

  int get_freeze_info_by_snapshot_version(const int64_t snapshot_version, FreezeInfoLite &freeze_info);
  int get_freeze_info_by_snapshot_version(
      const uint64_t table_id, const int64_t snapshot_version, FreezeInfo &freeze_info);

  int get_neighbour_major_freeze(const int64_t snapshot_version, NeighbourFreezeInfoLite &info);
  int get_neighbour_major_freeze(const uint64_t table_id, const int64_t snapshot_version, NeighbourFreezeInfo &info);

  int get_min_reserved_snapshot(const common::ObPartitionKey &pkey, const int64_t merged_version,
      const int64_t schema_version, int64_t &snapshot_version);
  int get_reserve_points(const int64_t tenant_id, const share::ObSnapShotType snapshot_type,
      common::ObIArray<share::ObSnapshotInfo> &restore_points, int64_t &snapshot_gc_ts);
171
  int get_restore_point_min_schema_version(const int64_t tenant_id, int64_t &schema_version);
M
mw0 已提交
172 173 174
  int update_info(const int64_t snapshot_gc_ts, const common::ObIArray<SchemaPair> &gc_schema_version,
      const common::ObIArray<FreezeInfoLite> &info_list, const common::ObIArray<share::ObSnapshotInfo> &snapshots,
      const int64_t min_major_version, bool &changed);
O
oceanbase-admin 已提交
175 176 177

  int64_t get_snapshot_gc_ts();

M
mw0 已提交
178 179
  ObFreezeInfoSnapshotMgr(const ObFreezeInfoSnapshotMgr &) = delete;
  ObFreezeInfoSnapshotMgr &operator=(const ObFreezeInfoSnapshotMgr &) = delete;
O
oceanbase-admin 已提交
180 181 182 183

  ObFreezeInfoSnapshotMgr();
  virtual ~ObFreezeInfoSnapshotMgr();

G
gm 已提交
184
private:
O
oceanbase-admin 已提交
185 186 187 188 189 190 191
  typedef common::RWLock::RLockGuard RLockGuard;
  typedef common::RWLock::WLockGuard WLockGuard;

  static const int64_t RELOAD_INTERVAL = 1L * 1000L * 1000L;
  static const int64_t MAX_GC_SNAPSHOT_TS_REFRESH_TS = 10L * 60L * 1000L * 1000L;
  static const int64_t FLUSH_GC_SNAPSHOT_TS_REFRESH_TS = common::MODIFY_GC_SNAPSHOT_INTERVAL + 10L * 1000L * 1000L;

M
mw0 已提交
192
  int get_latest_freeze_version(int64_t &freeze_version);
O
oceanbase-admin 已提交
193 194 195 196 197 198 199 200 201
  int64_t get_next_idx()
  {
    return 1L - cur_idx_;
  }
  void switch_info()
  {
    cur_idx_ = get_next_idx();
  }
  int prepare_new_info_list(const int64_t min_major_version);
M
mw0 已提交
202 203
  virtual int get_multi_version_duration(const uint64_t tenant_id, int64_t &duration) const;
  int inner_get_neighbour_major_freeze(const int64_t snapshot_version, NeighbourFreezeInfoLite &info);
O
oceanbase-admin 已提交
204
  int update_next_gc_schema_version(
M
mw0 已提交
205 206 207
      const common::ObIArray<SchemaPair> &gc_schema_version, const int64_t snapshot_gc_ts);
  int update_next_info_list(const common::ObIArray<FreezeInfoLite> &info_list);
  int update_next_snapshots(const common::ObIArray<share::ObSnapshotInfo> &snapshots);
O
oceanbase-admin 已提交
208
  int get_freeze_info_by_major_version_(
M
mw0 已提交
209
      const int64_t major_version, FreezeInfoLite &freeze_info, bool &is_first_major_version);
O
oceanbase-admin 已提交
210
  int get_tenant_freeze_info_by_major_version_(
M
mw0 已提交
211
      const uint64_t tenant_id, const int64_t major_version, const bool async, FreezeInfo &freeze_info);
O
oceanbase-admin 已提交
212 213 214

  class SchemaCache;
  class SchemaQuerySet {
G
gm 已提交
215
  public:
M
mw0 已提交
216
    explicit SchemaQuerySet(SchemaCache &schema_cache);
O
oceanbase-admin 已提交
217 218 219 220 221 222 223
    ~SchemaQuerySet()
    {}
    int init();

    int submit_async_schema_query(const uint64_t tenant_id, const int64_t freeze_version);
    int update_schema_cache();

G
gm 已提交
224
  private:
O
oceanbase-admin 已提交
225 226 227 228 229 230 231 232
    struct SchemaQuery {
      SchemaQuery() : tenant_id_(0), freeze_version_(0)
      {}

      SchemaQuery(const uint64_t tenant_id, const int64_t freeze_version)
          : tenant_id_(tenant_id), freeze_version_(freeze_version)
      {}

M
mw0 已提交
233
      bool operator==(const SchemaQuery &o) const
O
oceanbase-admin 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
      {
        return (tenant_id_ == o.tenant_id_) && (freeze_version_ == o.freeze_version_);
      }

      inline uint64_t hash() const
      {
        uint64_t hash_ret = 0;
        hash_ret = common::murmurhash(&tenant_id_, sizeof(tenant_id_), 0);
        hash_ret = common::murmurhash(&freeze_version_, sizeof(freeze_version_), hash_ret);
        return hash_ret;
      }

      uint64_t tenant_id_;
      int64_t freeze_version_;
    };

M
mw0 已提交
250
    int pop_schema_query(SchemaQuery &query);
O
oceanbase-admin 已提交
251 252

    common::hash::ObHashSet<SchemaQuery> schema_querys_;
M
mw0 已提交
253
    SchemaCache &schema_cache_;
O
oceanbase-admin 已提交
254 255 256 257 258 259
    common::RWLock lock_;

    bool inited_;
  };

  class ReloadTask : public common::ObTimerTask {
G
gm 已提交
260
  public:
M
mw0 已提交
261
    ReloadTask(ObFreezeInfoSnapshotMgr &mgr, SchemaQuerySet &schema_query_set);
O
oceanbase-admin 已提交
262 263
    virtual ~ReloadTask()
    {}
M
mw0 已提交
264
    int init(common::ObISQLClient &sql_proxy, bool is_remote);
O
oceanbase-admin 已提交
265 266 267
    virtual void runTimerTask();
    int try_update_info();

G
gm 已提交
268
  private:
M
mw0 已提交
269 270 271
    int get_global_info_compat_below_220(int64_t &snapshot_gc_ts, common::ObIArray<SchemaPair> &gc_schema_version);
    int get_global_info(int64_t &snapshot_gc_ts, common::ObIArray<SchemaPair> &gc_schema_version);
    int get_freeze_info(int64_t &min_major_version, common::ObIArray<FreezeInfoLite> &freeze_info);
O
oceanbase-admin 已提交
272 273 274

    bool inited_;
    bool is_remote_;
M
mw0 已提交
275 276
    ObFreezeInfoSnapshotMgr &mgr_;
    common::ObISQLClient *sql_proxy_;
O
oceanbase-admin 已提交
277 278 279
    share::ObFreezeInfoProxy freeze_info_proxy_;
    share::ObSnapshotTableProxy snapshot_proxy_;
    int64_t last_change_ts_;
M
mw0 已提交
280
    SchemaQuerySet &schema_query_set_;
O
oceanbase-admin 已提交
281 282 283 284
  };

  // LRU cache of schema information for different tenant major version
  class SchemaCache {
G
gm 已提交
285
  public:
O
oceanbase-admin 已提交
286 287
    typedef common::ObSpinLockGuard SpinLockGuard;

M
mw0 已提交
288
    explicit SchemaCache(SchemaQuerySet &schema_query_set);
O
oceanbase-admin 已提交
289 290
    virtual ~SchemaCache();

M
mw0 已提交
291
    int init(common::ObISQLClient &sql_proxy);
O
oceanbase-admin 已提交
292 293 294 295 296
    void reset();

    // return the schema version on success or OB_EAGAIN if major is not ready.
    // the LRU cache will update itself if the cache is not hit.
    int get_freeze_schema_version(
M
mw0 已提交
297
        const uint64_t tenant_id, const int64_t freeze_version, const bool async, int64_t &schema_version);
O
oceanbase-admin 已提交
298 299 300

    int update_schema_version(const uint64_t tenant_id, const int64_t freeze_version);

G
gm 已提交
301
  private:
O
oceanbase-admin 已提交
302 303 304 305 306 307 308 309
    // dlink list
    struct schema_node {
      // key
      uint64_t tenant_id;
      int64_t freeze_version;
      // value
      int64_t schema_version;
      // pointers
M
mw0 已提交
310 311
      schema_node *prev;
      schema_node *next;
O
oceanbase-admin 已提交
312

M
mw0 已提交
313 314 315 316
      schema_node()
      {
        reset();
      }
O
oceanbase-admin 已提交
317 318 319 320 321 322 323 324

      void set(const uint64_t tenant, const int64_t freeze, const int64_t schema)
      {
        tenant_id = tenant;
        freeze_version = freeze;
        schema_version = schema;
      }

O
obdev 已提交
325 326 327 328 329 330 331 332 333
      void reset()
      {
        tenant_id = common::OB_INVALID_ID;
        freeze_version = common::OB_INVALID_VERSION;
        schema_version = common::OB_INVALID_VERSION;
        prev = NULL;
        next = NULL;
      }

O
oceanbase-admin 已提交
334 335 336
      TO_STRING_KV(K(tenant_id), K(freeze_version), K(schema_version));
    };

M
mw0 已提交
337 338 339
    void insert(schema_node *p);
    void move_forward(schema_node *p);
    int find(const uint64_t tenant_id, const int64_t freeze_version, int64_t &schema_version);
O
oceanbase-admin 已提交
340 341 342
    bool freeze_info_exist(const int64_t freeze_version);
    bool schema_exist(const uint64_t tenant_id, const int64_t freeze_version);
    bool inner_schema_exist(const uint64_t tenant_id, const int64_t freeze_version);
M
mw0 已提交
343
    schema_node *inner_find(const uint64_t tenant_id, const int64_t freeze_version);
O
oceanbase-admin 已提交
344

M
mw0 已提交
345 346 347
    virtual int fetch_freeze_schema(const uint64_t tenant_id, const int64_t freeze_version, int64_t &schema_version);
    virtual int fetch_freeze_schema(const uint64_t tenant_id, const int64_t freeze_version, int64_t &schema_version,
        common::ObIArray<SchemaPair> &freeze_schema);
O
oceanbase-admin 已提交
348

G
gm 已提交
349
  public:  // for ut only
O
oceanbase-admin 已提交
350 351
    int update_freeze_schema(const uint64_t tenant_id, const int64_t freeze_version, const int64_t schema_version);

G
gm 已提交
352
  private:
M
mw0 已提交
353
    int update_freeze_schema(const int64_t freeze_version, common::ObIArray<SchemaPair> &freeze_schema);
O
oceanbase-admin 已提交
354 355 356 357 358

    common::ObSpinLock lock_;

    static const int64_t MAX_SCHEMA_ENTRY = 10000L;

M
mw0 已提交
359 360
    schema_node *head_;
    schema_node *tail_;
O
oceanbase-admin 已提交
361 362 363
    int64_t cnt_;

    bool inited_;
M
mw0 已提交
364
    common::ObISQLClient *sql_proxy_;
O
oceanbase-admin 已提交
365 366 367 368 369
    share::ObFreezeInfoProxy freeze_info_proxy_;

    lib::ObMemAttr mem_attr_;
    common::ObSliceAlloc allocator_;

M
mw0 已提交
370
    SchemaQuerySet &schema_query_set_;
O
oceanbase-admin 已提交
371 372 373 374 375 376 377 378 379 380
  };

  bool inited_;
  int tg_id_;
  ReloadTask reload_task_;
  SchemaQuerySet schema_query_set_;

  int64_t snapshot_gc_ts_;
  common::hash::ObHashMap<uint64_t, GCSnapshotInfo> gc_snapshot_info_[2];

G
gm 已提交
381
protected:
O
oceanbase-admin 已提交
382
  SchemaCache schema_cache_;  // query schema version based on tenant id and major freeze version
G
gm 已提交
383
private:
O
oceanbase-admin 已提交
384 385 386 387 388 389 390 391 392
  common::ObSEArray<FreezeInfoLite, 32> info_list_[2];         // lite one doesnot contain schema_version
  common::ObSEArray<share::ObSnapshotInfo, 32> snapshots_[2];  // snapshots_ matains multi_version_start for index and
                                                               // others
  int64_t cur_idx_;

  common::RWLock lock_;
};

class ObFreezeInfoMgrWrapper {
G
gm 已提交
393
public:
M
mw0 已提交
394 395
  static ObFreezeInfoSnapshotMgr &get_instance(const uint64_t table_id = common::OB_INVALID_ID);
  static int init(common::ObISQLClient &local_proxy, common::ObISQLClient &remote_proxy);
O
oceanbase-admin 已提交
396 397 398 399
  static int start();
  static void wait();
  static void stop();

G
gm 已提交
400
private:
O
oceanbase-admin 已提交
401 402 403 404 405 406 407
  static ObFreezeInfoSnapshotMgr local_mgr_;
  static ObFreezeInfoSnapshotMgr remote_mgr_;
};
}  // namespace storage
}  // namespace oceanbase

#endif /* OCEANBASE_STORAGE_FREEZE_INFO_SNAPSHOT_MGR_ */