ob_partition_service.h 71.7 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_STORAGE_OB_PARTITION_SERVICE
#define OCEANBASE_STORAGE_OB_PARTITION_SERVICE

#include "lib/hash/ob_linear_hash_map.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "lib/queue/ob_dedup_queue.h"
#include "share/stat/ob_stat_manager.h"
#include "share/ob_locality_info.h"
#include "share/ob_locality_info.h"
#include "share/ob_build_index_struct.h"
#include "share/ob_partition_modify.h"
#include "share/rpc/ob_batch_rpc.h"
#include "common/ob_i_rs_cb.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/mysqlclient/ob_mysql_transaction.h"
#include "storage/ob_i_partition_storage.h"
#include "storage/transaction/ob_trans_define.h"
#include "storage/transaction/ob_trans_service.h"
#include "storage/ob_partition_service_rpc.h"
#include "storage/ob_migrate_retry_queue_thread.h"
#include "storage/ob_callback_queue_thread.h"
#include "storage/ob_slog_writer_queue_thread.h"
#include "storage/ob_saved_storage_info.h"
#include "storage/ob_locality_manager.h"
#include "storage/ob_storage_log_type.h"
#include "clog/ob_clog_mgr.h"
#include "clog/ob_log_rpc_proxy.h"
#include "clog/ob_i_submit_log_cb.h"
#include "clog/ob_log_replay_engine_wrapper.h"
#include "clog/ob_clog_aggre_runnable.h"
#include "election/ob_election_mgr.h"
#include "election/ob_election_rpc.h"
#include "sql/ob_end_trans_callback.h"
#include "ob_warm_up.h"
#include "ob_partition_component_factory.h"
#include "ob_partition_split_worker.h"
#include "ob_partition_worker.h"
#include "ob_trans_checkpoint_worker.h"
#include "storage/ob_garbage_collector.h"
#include "storage/ob_pg_index.h"
#include "storage/ob_pg_mgr.h"
#include "storage/gts/ob_ha_gts_mgr.h"
#include "storage/gts/ob_ha_gts_source.h"
#include "storage/ob_rebuild_scheduler.h"
#include "storage/ob_dup_replica_checker.h"
#include "storage/ob_partition_meta_redo_module.h"
#include "storage/ob_freeze_async_task.h"
#include "storage/ob_sstable_garbage_collector.h"
#include "storage/ob_server_checkpoint_log_reader.h"
#include "storage/ob_auto_part_scheduler.h"
#include "storage/ob_clog_cb_async_worker.h"
#include "lib/container/ob_array_array.h"
#include "storage/ob_partition_group_create_checker.h"
66
#include "storage/ob_backup_archive_log.h"
O
oceanbase-admin 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127

namespace oceanbase {
namespace blocksstable {
class ObStorageEnv;
}

namespace share {
class ObSplitPartition;
class ObSplitPartitionPair;
}  // namespace share

namespace election {
class ObIElectionMgr;
}

namespace transaction {
enum ObPartitionAuditOperator;
}

namespace storage {
class ObPartitionService;
class ObAddPartitionToPGLog;

// tenant-related information in the storage layer
struct ObTenantStorageInfo {
  ObTenantStorageInfo() : part_cnt_(0), pg_cnt_(0)
  {}
  int64_t part_cnt_;
  int64_t pg_cnt_;
};

struct ObPartitionMigrationDataStatics {
  ObPartitionMigrationDataStatics();
  virtual ~ObPartitionMigrationDataStatics() = default;
  void reset();
  int64_t total_macro_block_;
  int64_t ready_macro_block_;
  int64_t major_count_;
  int64_t mini_minor_count_;
  int64_t normal_minor_count_;
  int64_t buf_minor_count_;
  int64_t reuse_count_;
  int64_t partition_count_;
  int64_t finish_partition_count_;
  // no need reset, cause has inner retry
  int64_t input_bytes_;
  int64_t output_bytes_;

  TO_STRING_KV(K_(total_macro_block), K_(ready_macro_block), K_(major_count), K_(mini_minor_count),
      K_(normal_minor_count), K_(buf_minor_count), K_(reuse_count), K_(partition_count), K_(finish_partition_count),
      K_(input_bytes), K_(output_bytes));
};

struct ObPartMigrationRes {
  common::ObPartitionKey key_;
  common::ObReplicaMember src_;
  common::ObReplicaMember dst_;
  common::ObReplicaMember data_src_;
  share::ObPhysicalBackupArg backup_arg_;
  share::ObPhysicalValidateArg validate_arg_;
  ObPartitionMigrationDataStatics data_statics_;
128 129
  share::ObBackupBackupsetArg backup_backupset_arg_;
  share::ObBackupArchiveLogArg backup_archivelog_arg_;
O
oceanbase-admin 已提交
130 131 132 133 134 135 136 137 138 139
  int64_t quorum_;
  int32_t result_;

  ObPartMigrationRes()
      : key_(),
        src_(),
        dst_(),
        data_src_(),
        backup_arg_(),
        validate_arg_(),
140 141
        backup_backupset_arg_(),
        backup_archivelog_arg_(),
O
oceanbase-admin 已提交
142 143 144
        quorum_(-1),
        result_(OB_ERROR)
  {}
145 146
  TO_STRING_KV(K_(key), K_(src), K_(dst), K_(data_src), K_(backup_arg), K_(validate_arg), K_(backup_backupset_arg),
      K_(backup_archivelog_arg), K_(quorum), K_(result));
O
oceanbase-admin 已提交
147 148 149
};

class ObRestoreInfo {
G
gm 已提交
150
public:
O
oceanbase-admin 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
  typedef common::hash::ObHashMap<uint64_t, common::ObArray<blocksstable::ObSSTablePair>*,
      common::hash::NoPthreadDefendMode>
      SSTableInfoMap;
  ObRestoreInfo();
  virtual ~ObRestoreInfo();
  int init(const share::ObRestoreArgs& restore_args);
  bool is_inited() const
  {
    return is_inited_;
  }
  int add_sstable_info(const uint64_t index_id, common::ObIArray<blocksstable::ObSSTablePair>& block_list);
  int get_backup_block_info(const uint64_t index_id, const int64_t macro_idx, uint64_t& backup_index_id,
      blocksstable::ObSSTablePair& backup_block_pair) const;
  const share::ObRestoreArgs& get_restore_args() const
  {
    return arg_;
  }
  TO_STRING_KV(K_(arg));

G
gm 已提交
170
private:
O
oceanbase-admin 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183
  bool is_inited_;
  share::ObRestoreArgs arg_;
  SSTableInfoMap backup_sstable_info_map_;
  common::ObArenaAllocator allocator_;
  DISALLOW_COPY_AND_ASSIGN(ObRestoreInfo);
};

enum ObChangeMemberOption : int8_t {
  NORMAL_CHANGE_MEMBER_LIST = 0,
  SKIP_CHANGE_MEMBER_LIST,
};

struct ObReplicaOpArg {
184 185 186
  const static int64_t RESTORE_VERSION_0 = 0;  // old storage logical backup file format before 2.2.30, not supported
  const static int64_t RESTORE_VERSION_1 = 1;  // new storage physical backup file format
  common::ObPartitionKey key_;                 // TODO(muwei.ym) change it to pg key
O
oceanbase-admin 已提交
187 188 189 190 191 192 193 194 195 196
  common::ObReplicaMember src_;
  common::ObReplicaMember dst_;
  common::ObReplicaMember data_src_;
  ObReplicaOpType type_;
  int64_t quorum_;
  common::ObVersion base_version_;
  share::ObRestoreArgs restore_arg_;
  share::ObPhysicalBackupArg backup_arg_;
  share::ObPhysicalRestoreArg phy_restore_arg_;
  share::ObPhysicalValidateArg validate_arg_;
197 198
  share::ObBackupBackupsetArg backup_backupset_arg_;
  share::ObBackupArchiveLogArg backup_archive_log_arg_;
O
oceanbase-admin 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
  uint64_t index_id_;
  ObReplicaOpPriority priority_;
  int64_t cluster_id_;
  int64_t restore_version_;  // ObRestoreArgs:0, ObPhysicalRestoreArg:1
  ObChangeMemberOption change_member_option_;
  int64_t switch_epoch_;

  ObReplicaOpArg()
      : key_(),
        src_(),
        dst_(),
        data_src_(),
        type_(UNKNOWN_REPLICA_OP),
        quorum_(0),
        base_version_(),
        restore_arg_(),
        backup_arg_(),
        phy_restore_arg_(),
        validate_arg_(),
218 219
        backup_backupset_arg_(),
        backup_archive_log_arg_(),
O
oceanbase-admin 已提交
220 221 222 223 224 225 226 227 228 229
        index_id_(OB_INVALID_ID),
        priority_(ObReplicaOpPriority::PRIO_INVALID),
        cluster_id_(OB_INVALID_CLUSTER_ID),
        restore_version_(RESTORE_VERSION_0),
        change_member_option_(NORMAL_CHANGE_MEMBER_LIST),
        switch_epoch_(OB_INVALID_VERSION)
  {}

  bool is_valid() const
  {
230 231 232
    return (is_replica_op_valid(type_) && key_.is_valid() &&
               (VALIDATE_BACKUP_OP == type_ || BACKUP_BACKUPSET_OP == type_ || BACKUP_ARCHIVELOG_OP == type_ ||
                   dst_.is_valid()) &&
O
oceanbase-admin 已提交
233
               ((REMOVE_REPLICA_OP == type_ || RESTORE_REPLICA_OP == type_ || BACKUP_REPLICA_OP == type_ ||
234 235 236
                    BACKUP_BACKUPSET_OP == type_ || RESTORE_STANDBY_OP == type_) ||
                   VALIDATE_BACKUP_OP == type_ || BACKUP_ARCHIVELOG_OP == type_ ||
                   (src_.is_valid() && data_src_.is_valid())) &&
O
oceanbase-admin 已提交
237 238 239 240 241 242 243 244 245 246
               (COPY_LOCAL_INDEX_OP != type_ || OB_INVALID_ID != index_id_)) &&
           is_replica_op_priority_valid(priority_) &&
           (NORMAL_CHANGE_MEMBER_LIST == change_member_option_ || ADD_REPLICA_OP == type_ ||
               MIGRATE_REPLICA_OP == type_ || REBUILD_REPLICA_OP == type_ || FAST_MIGRATE_REPLICA_OP == type_ ||
               CHANGE_REPLICA_OP == type_ || RESTORE_STANDBY_OP == type_);
  }
  void reset();
  bool is_physical_restore() const;
  bool is_physical_restore_leader() const;
  bool is_physical_restore_follower() const;
O
obdev 已提交
247
  bool is_FtoL() const;
O
oceanbase-admin 已提交
248 249 250
  bool is_standby_restore() const;
  const char* get_replica_op_type_str() const;
  TO_STRING_KV(K_(key), K_(dst), K_(src), K_(data_src), K_(quorum), "type", get_replica_op_type_str(), K_(base_version),
251 252
      K_(restore_arg), K_(validate_arg), K_(backup_arg), K_(phy_restore_arg), K_(backup_backupset_arg), K_(index_id),
      K_(priority), K_(cluster_id), K_(restore_version), K_(switch_epoch));
O
oceanbase-admin 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
};

struct ObMigrateSrcInfo {
  ObMigrateSrcInfo() : src_addr_(), cluster_id_(-1)
  {}
  virtual ~ObMigrateSrcInfo() = default;
  bool is_valid() const
  {
    return src_addr_.is_valid() && -1 != cluster_id_;
  }

  void reset()
  {
    src_addr_.reset();
    cluster_id_ = -1;
  }

  uint64_t hash() const
  {
    uint64_t hash_value = 0;
    hash_value = common::murmurhash(&cluster_id_, sizeof(cluster_id_), hash_value);
    hash_value += src_addr_.hash();
    return hash_value;
  }
  bool operator==(const ObMigrateSrcInfo& src_info) const
  {
    return src_addr_ == src_info.src_addr_ && cluster_id_ == src_info.cluster_id_;
  }

  TO_STRING_KV(K_(src_addr), K_(cluster_id));
  common::ObAddr src_addr_;
  int64_t cluster_id_;
};

#define VIRTUAL_FOR_UNITTEST virtual

class ObPartitionService : public share::ObIPSCb,
                           public common::ObIDataAccessService,
                           public ObPartitionMetaRedoModule,
                           public common::ObTableStatDataService {
G
gm 已提交
293
public:
O
oceanbase-admin 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
  static const int64_t MC_WAIT_INTERVAL =
      200 * 1000;  // 200ms the interval of checking the completion of member change log syncing
  static const int64_t MC_SLEEP_TIME = 100000;                        // 100ms
  static const int64_t MC_WAIT_TIMEOUT = 6000000;                     // 6s the timeout for changing leader
  static const int64_t MC_TASK_TIMEOUT = 10000000;                    // 10s the timeout for writing member change log
  static const int64_t FETCH_LOG_TIMEOUT = 1000L * 1000L * 60L;       // 1min the timeout for catch up log
  static const int64_t GARBAGE_CLEAN_INTERVAL = 1000L * 1000L * 10L;  // 10s
  static const int64_t PURGE_RETIRE_MEMSTORE_INTERVAL = 1000L * 1000L * 5L;
  static const int64_t CLOG_REQUIRED_MINOR_FREEZE_INTERVAL = 1000L * 1000L * 20L;  // 20s
  static const int64_t RELOAD_LOCALITY_INTERVAL = 10 * 1000 * 1000L;               // 10S
  static const int64_t CREATE_TABLE_INTERVAL = 1000L * 1000L * 15L;                // 15s
  static const int64_t REFRESH_SCHEMA_INTERVAL = 600L * 1000L * 1000L;             // 10min
  static const int64_t OFFLINE_PARTITION_WAIT_INTERVAL = 2L * 1000L;               // 2ms
  static const int64_t MAX_RETRY_TIMES = 3;                                  // retry post result of migrate task to rs
  static const int64_t MC_INTERVAL_BEFORE_LEASE_EXPIRE = 2 * 1000 * 1000;    // 2s
  static const int64_t MIGRATE_RETRY_TIME_INTERVAL = 1 * 60 * 1000 * 1000L;  // retry interval for rebuild
  // estimated memory limit for single partition
  // 100kb(static) + 400kb(dynamic)/10 (assuming that one-tenth of the partition is active)
  static const int64_t SINGLE_PART_STATIC_MEM_COST = 100 * 1024L;
  static const int64_t SINGLE_PART_DYNAMIC_MEM_COST = 400 * 1024L;
  // estimated memory limit for single partition group
  static const int64_t SINGLE_PG_DYNAMIC_MEM_COST = SINGLE_PART_DYNAMIC_MEM_COST + 1 * 1024 * 1024L;
  static const int64_t TENANT_PART_NUM_THRESHOLD = 1000;
  static constexpr const char RPScanIteratorLabel[] = "RPScanIterator";

G
gm 已提交
319
public:
O
oceanbase-admin 已提交
320 321 322 323
  ObPartitionService();
  virtual ~ObPartitionService();
  static OB_INLINE ObPartitionService& get_instance();

G
gm 已提交
324
public:
O
oceanbase-admin 已提交
325 326 327 328 329 330 331 332 333
  virtual int init(const blocksstable::ObStorageEnv& env, const common::ObAddr& self_addr,
      ObPartitionComponentFactory* cp_fty, share::schema::ObMultiVersionSchemaService* schema_service,
      share::ObIPartitionLocationCache* location_cache, share::ObRsMgr* rs_mgr, storage::ObIPartitionReport* rs_cb,
      rpc::frame::ObReqTransport* req_transport, obrpc::ObBatchRpc* batch_rpc, obrpc::ObCommonRpcProxy& rs_rpc_proxy,
      obrpc::ObSrvRpcProxy& srv_rpc_proxy, common::ObMySQLProxy& sql_proxy, share::ObRemoteSqlProxy& remote_sql_proxy,
      share::ObAliveServerTracer& server_tracer, common::ObInOutBandwidthThrottle& bandwidth_throttle);
  VIRTUAL_FOR_UNITTEST int start();
  VIRTUAL_FOR_UNITTEST int stop();
  VIRTUAL_FOR_UNITTEST int wait();
L
LINxiansheng 已提交
334
  VIRTUAL_FOR_UNITTEST int destroy() override;
O
oceanbase-admin 已提交
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353

  VIRTUAL_FOR_UNITTEST int wait_start_finish();

  VIRTUAL_FOR_UNITTEST int reload_config();

  VIRTUAL_FOR_UNITTEST int batch_check_leader_active(
      const obrpc::ObBatchCheckLeaderArg& arg, obrpc::ObBatchCheckRes& result);
  VIRTUAL_FOR_UNITTEST int batch_get_protection_level(
      const obrpc::ObBatchCheckLeaderArg& arg, obrpc::ObBatchCheckRes& result);
  VIRTUAL_FOR_UNITTEST int primary_process_protect_mode_switch();
  VIRTUAL_FOR_UNITTEST int standby_update_replica_protection_level();
  VIRTUAL_FOR_UNITTEST int create_batch_partition_groups(
      const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<int>& batch_res);
  VIRTUAL_FOR_UNITTEST int create_batch_pg_partitions(
      const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<int>& batch_res);
  VIRTUAL_FOR_UNITTEST int create_new_partition(const common::ObPartitionKey& key, ObIPartitionGroup*& partition);
  VIRTUAL_FOR_UNITTEST int add_new_partition(ObIPartitionGroupGuard& partition_guard);
  VIRTUAL_FOR_UNITTEST int remove_partition(const common::ObPartitionKey& key, const bool write_slog = true);
  VIRTUAL_FOR_UNITTEST int remove_partition_from_pg(
354 355 356
      const bool for_replay, const ObPartitionKey& pg_key, const ObPartitionKey& pkey, const uint64_t log_id);
  VIRTUAL_FOR_UNITTEST int online_partition(const common::ObPartitionKey& pkey, const int64_t publish_version,
      const int64_t restore_snapshot_version, const uint64_t last_restore_log_id, const int64_t last_restore_log_ts);
O
oceanbase-admin 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  // before building the index, wait for all transactions with lower schema version to finish
  // max_commit_version is the max commit version of those transactions
  VIRTUAL_FOR_UNITTEST int check_schema_version_elapsed(const ObPartitionKey& partition, const int64_t schema_version,
      const uint64_t index_id, int64_t& max_commit_version);

  VIRTUAL_FOR_UNITTEST int check_ctx_create_timestamp_elapsed(const ObPartitionKey& partition, const int64_t ts);

  // transaction service interface
  VIRTUAL_FOR_UNITTEST int start_trans(const uint64_t tenant_id, const uint64_t cluster_id,
      const transaction::ObStartTransParam& req, const int64_t expired_time, const uint32_t session_id,
      const uint64_t proxy_session_id, transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int end_trans(
      bool is_rollback, transaction::ObTransDesc& trans_desc, sql::ObIEndTransCallback& cb, const int64_t expired_time);

  VIRTUAL_FOR_UNITTEST int start_stmt(const transaction::ObStmtParam& stmt_param, transaction::ObTransDesc& trans_desc,
      const common::ObPartitionLeaderArray& pla, common::ObPartitionArray& participants);
  virtual int half_stmt_commit(const transaction::ObTransDesc& trans_desc, const common::ObPartitionKey& partition);
  VIRTUAL_FOR_UNITTEST int end_stmt(bool is_rollback, bool is_incomplete,
      const ObPartitionArray& cur_stmt_all_participants, const transaction::ObPartitionEpochArray& epoch_arr,
      const ObPartitionArray& discard_participant, const ObPartitionLeaderArray& pla,
      transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int start_nested_stmt(transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int end_nested_stmt(
      transaction::ObTransDesc& trans_desc, const ObPartitionArray& participants, const bool is_rollback);
  VIRTUAL_FOR_UNITTEST int start_participant(transaction::ObTransDesc& trans_desc,
      const common::ObPartitionArray& participants, transaction::ObPartitionEpochArray& partition_epoch_arr);
  VIRTUAL_FOR_UNITTEST int end_participant(
      bool is_rollback, transaction::ObTransDesc& trans_desc, const common::ObPartitionArray& participants);
  VIRTUAL_FOR_UNITTEST int kill_query_session(const transaction::ObTransDesc& trans_desc, const int status);
  VIRTUAL_FOR_UNITTEST int internal_kill_trans(transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int savepoint(transaction::ObTransDesc& trans_desc, const common::ObString& sp_name);
  VIRTUAL_FOR_UNITTEST int rollback_savepoint(transaction::ObTransDesc& trans_desc, const common::ObString& sp_name,
      const transaction::ObStmtParam& stmt_param);
  VIRTUAL_FOR_UNITTEST int release_savepoint(transaction::ObTransDesc& trans_desc, const common::ObString& sp_name);
  VIRTUAL_FOR_UNITTEST int mark_trans_forbidden_sql_no(const transaction::ObTransID& trans_id,
      const common::ObPartitionArray& partitions, const int64_t sql_no, bool& forbid_succ);
  VIRTUAL_FOR_UNITTEST int is_trans_forbidden_sql_no(const transaction::ObTransID& trans_id,
      const common::ObPartitionArray& partitions, const int64_t sql_no, bool& is_forbidden);
  VIRTUAL_FOR_UNITTEST int xa_rollback_all_changes(
      transaction::ObTransDesc& trans_desc, const transaction::ObStmtParam& stmt_param);
  // xa transactions interfaces
  VIRTUAL_FOR_UNITTEST int xa_start(const transaction::ObXATransID& xid, const int64_t flags,
      const int64_t xa_end_timeout_seconds, transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int xa_end(
      const transaction::ObXATransID& xid, const int64_t flags, transaction::ObTransDesc& trans_desc);
  VIRTUAL_FOR_UNITTEST int xa_prepare(
      const transaction::ObXATransID& xid, const uint64_t tenant_id, const int64_t stmt_expired_time);
  VIRTUAL_FOR_UNITTEST int xa_end_trans(const transaction::ObXATransID& xid, const bool is_rollback,
G
gjw2284740 已提交
405
      const int64_t flags, transaction::ObTransDesc& trans_desc, bool& access_temp_table);
O
oceanbase-admin 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
  VIRTUAL_FOR_UNITTEST int get_xa_trans_state(int32_t& state, transaction::ObTransDesc& trans_desc);
  // partition storage interfaces
  virtual int table_scan(ObVTableScanParam& vparam, common::ObNewRowIterator*& result) override;
  virtual int table_scan(ObVTableScanParam& vparam, common::ObNewIterIterator*& result) override;
  virtual int revert_scan_iter(common::ObNewRowIterator* iter) override;
  virtual int revert_scan_iter(common::ObNewIterIterator* iter) override;
  VIRTUAL_FOR_UNITTEST int delete_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      common::ObNewRowIterator* row_iter, int64_t& affected_rows);
  VIRTUAL_FOR_UNITTEST int delete_row(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids, const common::ObNewRow& row);
  VIRTUAL_FOR_UNITTEST int put_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      common::ObNewRowIterator* row_iter, int64_t& affected_rows);
  VIRTUAL_FOR_UNITTEST int insert_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      common::ObNewRowIterator* row_iter, int64_t& affected_rows);
  VIRTUAL_FOR_UNITTEST int insert_row(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids, const common::ObNewRow& row);
  VIRTUAL_FOR_UNITTEST int insert_row(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      const common::ObIArray<uint64_t>& duplicated_column_ids, const common::ObNewRow& row, const ObInsertFlag flag,
      int64_t& affected_rows, common::ObNewRowIterator*& duplicated_rows);
  VIRTUAL_FOR_UNITTEST int fetch_conflict_rows(const transaction::ObTransDesc& trans_desc,
      const ObDMLBaseParam& dml_param, const common::ObPartitionKey& pkey,
      const common::ObIArray<uint64_t>& in_column_ids, const common::ObIArray<uint64_t>& out_column_ids,
      common::ObNewRowIterator& check_row_iter, common::ObIArray<common::ObNewRowIterator*>& dup_row_iters);
  VIRTUAL_FOR_UNITTEST int revert_insert_iter(const common::ObPartitionKey& pkey, common::ObNewRowIterator* iter);
  VIRTUAL_FOR_UNITTEST int update_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      const common::ObIArray<uint64_t>& updated_column_ids, common::ObNewRowIterator* row_iter, int64_t& affected_rows);
  VIRTUAL_FOR_UNITTEST int update_row(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const common::ObPartitionKey& pkey, const common::ObIArray<uint64_t>& column_ids,
      const common::ObIArray<uint64_t>& updated_column_ids, const common::ObNewRow& old_row,
      const common::ObNewRow& new_row);
  VIRTUAL_FOR_UNITTEST int lock_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const int64_t abs_lock_timeout, const common::ObPartitionKey& pkey, common::ObNewRowIterator* row_iter,
      const ObLockFlag lock_flag, int64_t& affected_rows);
  VIRTUAL_FOR_UNITTEST int lock_rows(const transaction::ObTransDesc& trans_desc, const ObDMLBaseParam& dml_param,
      const int64_t abs_lock_timeout, const common::ObPartitionKey& pkey, const common::ObNewRow& row,
      const ObLockFlag lock_flag);

  virtual int join_mv_scan(
      ObTableScanParam& left_param, ObTableScanParam& right_param, common::ObNewRowIterator*& result) override;

  // partition manager interface
  int get_table_store_cnt(int64_t& table_cnt);
  VIRTUAL_FOR_UNITTEST void revert_replay_status(ObReplayStatus* replay_status) const;
  VIRTUAL_FOR_UNITTEST int get_gts(const uint64_t tenant_id, const transaction::MonotonicTs stc, int64_t& gts) const;
  VIRTUAL_FOR_UNITTEST int trigger_gts();

  VIRTUAL_FOR_UNITTEST ObPGPartitionIterator* alloc_pg_partition_iter();
  VIRTUAL_FOR_UNITTEST ObSinglePGPartitionIterator* alloc_single_pg_partition_iter();
  VIRTUAL_FOR_UNITTEST void revert_pg_partition_iter(ObIPGPartitionIterator* iter);
  VIRTUAL_FOR_UNITTEST bool is_empty() const;
  VIRTUAL_FOR_UNITTEST int get_partition_count(int64_t& partition_count) const;
  VIRTUAL_FOR_UNITTEST int halt_all_prewarming(uint64_t tenant_id = OB_INVALID_TENANT_ID);
  VIRTUAL_FOR_UNITTEST int get_all_partition_status(int64_t& inactive_num, int64_t& total_num) const;

  // replay interfaces
  VIRTUAL_FOR_UNITTEST int replay_redo_log(const common::ObPartitionKey& pkey, const ObStoreCtx& ctx,
      const int64_t log_timestamp, const int64_t log_id, const char* buf, const int64_t size, bool& replayed);

  // partition service callback function
  VIRTUAL_FOR_UNITTEST int push_callback_task(const ObCbTask& task);
  VIRTUAL_FOR_UNITTEST int push_into_migrate_retry_queue(const common::ObPartitionKey& pkey, const int32_t task_type);
  virtual int on_leader_revoke(const common::ObPartitionKey& pkey, const common::ObRole& role) override;
  virtual int on_leader_takeover(const common::ObPartitionKey& pkey, const common::ObRole& role,
      const bool is_elected_by_changing_leader) override;
  virtual int on_leader_active(const common::ObPartitionKey& pkey, const common::ObRole& role,
      const bool is_elected_by_changing_leader) override;
  VIRTUAL_FOR_UNITTEST int internal_leader_revoke(const ObCbTask& revoke_task);
  VIRTUAL_FOR_UNITTEST int internal_leader_takeover(const ObCbTask& takeover_task);
  VIRTUAL_FOR_UNITTEST int internal_leader_takeover_bottom_half(const ObCbTask& takeover_task);
  VIRTUAL_FOR_UNITTEST int internal_leader_active(const ObCbTask& active_task);
  virtual int async_leader_revoke(const common::ObPartitionKey& pkey, const uint32_t revoke_type);
  virtual bool is_take_over_done(const common::ObPartitionKey& pkey) const override;
  virtual bool is_revoke_done(const common::ObPartitionKey& pkey) const override;
  virtual int submit_ms_info_task(const common::ObPartitionKey& pkey, const common::ObAddr& server,
      const int64_t cluster_id, const clog::ObLogType log_type, const uint64_t ms_log_id, const int64_t mc_timestamp,
      const int64_t replica_num, const common::ObMemberList& prev_member_list,
      const common::ObMemberList& curr_member_list, const common::ObProposalID& ms_proposal_id);
  virtual int process_ms_info_task(ObMsInfoTask& task);
  virtual int on_member_change_success(const common::ObPartitionKey& pkey, const clog::ObLogType log_type,
      const uint64_t ms_log_id, const int64_t mc_timestamp, const int64_t replica_num,
      const common::ObMemberList& prev_member_list, const common::ObMemberList& curr_member_list,
      const common::ObProposalID& ms_proposal_id) override;
  // used only by clog
  // set is_need_renew to refresh location cache
  virtual int nonblock_get_strong_leader_from_loc_cache(const common::ObPartitionKey& pkey, ObAddr& leader,
      int64_t& cluster_id, const bool is_need_renew = false) override;

  virtual int nonblock_get_leader_by_election_from_loc_cache(const common::ObPartitionKey& pkey, int64_t cluster_id,
      ObAddr& leader, const bool is_need_renew = false) override;
  virtual int get_restore_leader_from_loc_cache(
      const common::ObPartitionKey& pkey, ObAddr& restore_leader, const bool is_need_renew);
  virtual int nonblock_get_leader_by_election_from_loc_cache(
      const common::ObPartitionKey& pkey, ObAddr& leader, const bool is_need_renew = false) override;
  virtual int nonblock_get_strong_leader_from_loc_cache(
      const common::ObPartitionKey& pkey, ObAddr& leader, const bool is_need_renew = false) override;
  virtual int handle_log_missing(const common::ObPartitionKey& pkey, const common::ObAddr& server) override;
  virtual int restore_standby_replica(
      const common::ObPartitionKey& pkey, const common::ObAddr& server, const int64_t cluster_id);
  virtual bool is_service_started() const override;
  virtual int get_global_max_decided_trans_version(int64_t& max_decided_trans_version) const override;
  VIRTUAL_FOR_UNITTEST int handle_split_dest_partition_request(
      const obrpc::ObSplitDestPartitionRequestArg& arg, obrpc::ObSplitDestPartitionResult& result);
  VIRTUAL_FOR_UNITTEST int handle_split_dest_partition_result(const obrpc::ObSplitDestPartitionResult& result);

  VIRTUAL_FOR_UNITTEST int handle_replica_split_progress_request(
      const obrpc::ObReplicaSplitProgressRequest& arg, obrpc::ObReplicaSplitProgressResult& result);
  VIRTUAL_FOR_UNITTEST int handle_replica_split_progress_result(const obrpc::ObReplicaSplitProgressResult& result);

  VIRTUAL_FOR_UNITTEST int set_global_max_decided_trans_version(const int64_t trans_version);
  VIRTUAL_FOR_UNITTEST const common::ObAddr& get_self_addr() const
  {
    return self_addr_;
  }

  // expose component
  VIRTUAL_FOR_UNITTEST transaction::ObTransService* get_trans_service();
  VIRTUAL_FOR_UNITTEST memtable::ObIMemtableCtxFactory* get_mem_ctx_factory();
  VIRTUAL_FOR_UNITTEST clog::ObICLogMgr* get_clog_mgr();
  VIRTUAL_FOR_UNITTEST election::ObIElectionMgr* get_election_mgr();
  ObRebuildReplicaService& get_rebuild_replica_service()
  {
    return rebuild_replica_service_;
  }
  VIRTUAL_FOR_UNITTEST ObPartitionSplitWorker* get_split_worker();
  VIRTUAL_FOR_UNITTEST clog::ObClogAggreRunnable* get_clog_aggre_runnable();
  VIRTUAL_FOR_UNITTEST int get_locality_info(share::ObLocalityInfo& locality_info);
  VIRTUAL_FOR_UNITTEST const ObLocalityManager* get_locality_manager() const
  {
    return &locality_manager_;
  }
  VIRTUAL_FOR_UNITTEST ObLocalityManager* get_locality_manager()
  {
    return &locality_manager_;
  }

  VIRTUAL_FOR_UNITTEST int replay(const ObPartitionKey& partition, const char* log, const int64_t size,
547
      const uint64_t log_id, const int64_t log_ts, int64_t& schema_version);
O
oceanbase-admin 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
  VIRTUAL_FOR_UNITTEST int minor_freeze(const uint64_t tenant_id);
  VIRTUAL_FOR_UNITTEST int minor_freeze(
      const common::ObPartitionKey& pkey, const bool emergency = false, const bool force = false);
  VIRTUAL_FOR_UNITTEST int freeze_partition(
      const ObPartitionKey& pkey, const bool emergency, const bool force, int64_t& freeze_snapshot);
  VIRTUAL_FOR_UNITTEST int check_dirty_txn(const ObPartitionKey& pkey, const int64_t min_log_ts,
      const int64_t max_log_ts, int64_t& freeze_ts, bool& is_dirty);
  // query accum_checksum and submit_timestamp with pkey and log id
  // could cost a few seconds
  VIRTUAL_FOR_UNITTEST int query_log_info_with_log_id(const ObPartitionKey& pkey, const uint64_t log_id,
      const int64_t timeout, int64_t& accum_checksum, int64_t& submit_timestamp, int64_t& epoch_id);

  // misc functions
  VIRTUAL_FOR_UNITTEST int get_dup_replica_type(
      const common::ObPartitionKey& pkey, const common::ObAddr& server, DupReplicaType& dup_replica_type);
  VIRTUAL_FOR_UNITTEST int get_dup_replica_type(
      const uint64_t table_id, const common::ObAddr& server, DupReplicaType& dup_replica_type);
  VIRTUAL_FOR_UNITTEST int get_replica_status(const common::ObPartitionKey& pkey, share::ObReplicaStatus& status) const;
  VIRTUAL_FOR_UNITTEST int get_role(const common::ObPartitionKey& pkey, common::ObRole& role) const;
  VIRTUAL_FOR_UNITTEST int get_role_unsafe(const common::ObPartitionKey& pkey, common::ObRole& role) const;
  VIRTUAL_FOR_UNITTEST int get_leader_curr_member_list(
      const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const;
  VIRTUAL_FOR_UNITTEST int get_curr_member_list(
      const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const;
  VIRTUAL_FOR_UNITTEST int get_curr_leader_and_memberlist(const common::ObPartitionKey& pkey, common::ObAddr& leader,
      common::ObRole& role, common::ObMemberList& member_list, common::ObChildReplicaList& children_list,
      common::ObReplicaType& replica_type, common::ObReplicaProperty& property) const;
  VIRTUAL_FOR_UNITTEST int get_dst_leader_candidate(
      const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const;
  VIRTUAL_FOR_UNITTEST int get_dst_candidates_array(const common::ObPartitionIArray& pkey_array,
      const common::ObAddrIArray& dst_server_list, common::ObSArray<common::ObAddrSArray>& candidate_list_array,
      common::ObSArray<obrpc::CandidateStatusList>& candidate_status_array) const;
  VIRTUAL_FOR_UNITTEST int get_leader(const common::ObPartitionKey& pkey, common::ObAddr& leader) const;
  VIRTUAL_FOR_UNITTEST int change_leader(const common::ObPartitionKey& pkey, const common::ObAddr& leader);
  VIRTUAL_FOR_UNITTEST int batch_change_rs_leader(const common::ObAddr& leader);
  VIRTUAL_FOR_UNITTEST int auto_batch_change_rs_leader();
  VIRTUAL_FOR_UNITTEST bool is_partition_exist(const common::ObPartitionKey& pkey) const;
  VIRTUAL_FOR_UNITTEST bool is_scan_disk_finished();
  //  VIRTUAL_FOR_UNITTEST void garbage_clean();
  VIRTUAL_FOR_UNITTEST int activate_tenant(const uint64_t tenant_id);
  VIRTUAL_FOR_UNITTEST int inactivate_tenant(const uint64_t tenant_id);
  virtual bool is_tenant_active(const common::ObPartitionKey& pkey) const override;
  virtual bool is_tenant_active(const uint64_t tenant_id) const override;
  VIRTUAL_FOR_UNITTEST ObIPartitionComponentFactory* get_cp_fty();

  VIRTUAL_FOR_UNITTEST int get_server_locality_array(
      common::ObIArray<share::ObServerLocality>& server_locality_array, bool& has_readonly_zone) const;
  virtual int get_server_region_across_cluster(const common::ObAddr& server, common::ObRegion& region) const;

  virtual int get_server_region(const common::ObAddr& server, common::ObRegion& region) const override;
  virtual int record_server_region(const common::ObAddr& server, const common::ObRegion& region) override;
  virtual int get_server_idc(const common::ObAddr& server, common::ObIDC& idc) const override;
  virtual int record_server_idc(const common::ObAddr& server, const common::ObIDC& idc) override;
  virtual int get_server_cluster_id(const common::ObAddr& server, int64_t& cluster_id) const override;
  virtual int record_server_cluster_id(const common::ObAddr& server, const int64_t cluster_id) override;
  VIRTUAL_FOR_UNITTEST int force_refresh_locality_info();
  int add_refresh_locality_task();
  ObCLogCallbackAsyncWorker& get_callback_async_worker()
  {
    return cb_async_worker_;
  }

  // batch migration
  VIRTUAL_FOR_UNITTEST int migrate_replica_batch(const obrpc::ObMigrateReplicaBatchArg& arg);

  VIRTUAL_FOR_UNITTEST int restore_replica(const obrpc::ObRestoreReplicaArg& arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int physical_restore_replica(
      const obrpc::ObPhyRestoreReplicaArg& arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int restore_follower_replica(const obrpc::ObCopySSTableBatchArg& rpc_arg);
  VIRTUAL_FOR_UNITTEST int backup_replica_batch(const obrpc::ObBackupBatchArg& arg);
  VIRTUAL_FOR_UNITTEST int validate_backup_batch(const obrpc::ObValidateBatchArg& arg);
619 620
  VIRTUAL_FOR_UNITTEST int backup_backupset_batch(const obrpc::ObBackupBackupsetBatchArg& arg);
  VIRTUAL_FOR_UNITTEST int backup_archive_log(const obrpc::ObBackupArchiveLogBatchArg& arg);
O
oceanbase-admin 已提交
621 622 623

  virtual int get_tenant_log_archive_status(
      const share::ObGetTenantLogArchiveStatusArg& arg, share::ObTenantLogArchiveStatusWrapper& result);
624 625
  virtual int get_tenant_log_archive_status_v2(
      const share::ObGetTenantLogArchiveStatusArg& arg, share::ObServerTenantLogArchiveStatusWrapper& result);
O
oceanbase-admin 已提交
626 627 628 629 630 631 632 633 634
  virtual int get_archive_pg_map(archive::PGArchiveMap*& map);
  int push_replica_task(
      const ObReplicaOpType& type, const obrpc::ObMigrateReplicaArg& migrate_arg, ObIArray<ObReplicaOpArg>& task_list);

  int check_add_or_migrate_replica_arg(const common::ObPartitionKey& pkey,
      const common::ObReplicaMember& dst, /* new replica */
      const common::ObReplicaMember& src,
      const common::ObReplicaMember& data_src, /* data source, if invalid, use leader instead */
      const int64_t quorum);
M
mw0 已提交
635
  VIRTUAL_FOR_UNITTEST int standby_cut_data_batch(const obrpc::ObStandbyCutDataBatchTaskArg& arg);
O
oceanbase-admin 已提交
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707

  // interface for online/offline
  VIRTUAL_FOR_UNITTEST int add_replica(const obrpc::ObAddReplicaArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int batch_add_replica(
      const obrpc::ObAddReplicaBatchArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int migrate_replica(const obrpc::ObMigrateReplicaArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int rebuild_replica(const obrpc::ObRebuildReplicaArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int schedule_standby_restore_task(
      const obrpc::ObRebuildReplicaArg& rpc_arg, const int64_t cluster_id, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int copy_global_index(const obrpc::ObCopySSTableArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int copy_local_index(const obrpc::ObCopySSTableArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int change_replica(const obrpc::ObChangeReplicaArg& rpc_arg, const share::ObTaskId& task_id);
  VIRTUAL_FOR_UNITTEST int batch_change_replica(const obrpc::ObChangeReplicaBatchArg& arg);
  VIRTUAL_FOR_UNITTEST int remove_replica(const common::ObPartitionKey& key, const common::ObReplicaMember& dst);
  VIRTUAL_FOR_UNITTEST int batch_remove_replica(const obrpc::ObRemoveReplicaArgs& args);
  VIRTUAL_FOR_UNITTEST int batch_remove_non_paxos_replica(
      const obrpc::ObRemoveNonPaxosReplicaBatchArg& args, obrpc::ObRemoveNonPaxosReplicaBatchResult& results);

  VIRTUAL_FOR_UNITTEST int is_member_change_done(
      const common::ObPartitionKey& key, const uint64_t log_id, const int64_t timestamp);
  VIRTUAL_FOR_UNITTEST int handle_member_change_callback(
      const ObReplicaOpArg& arg, const int result, bool& could_retry);
  VIRTUAL_FOR_UNITTEST int handle_report_meta_table_callback(
      const ObPartitionKey& pkey, const int result, const bool need_report_checksum);
  VIRTUAL_FOR_UNITTEST int report_migrate_in_indexes(const ObPartitionKey& pkey);
  VIRTUAL_FOR_UNITTEST int add_replica_mc(const obrpc::ObMemberChangeArg& arg, obrpc::ObMCLogRpcInfo& mc_log_info);
  VIRTUAL_FOR_UNITTEST int remove_replica_mc(const obrpc::ObMemberChangeArg& arg, obrpc::ObMCLogRpcInfo& mc_log_info);
  VIRTUAL_FOR_UNITTEST int batch_remove_replica_mc(
      const obrpc::ObMemberChangeBatchArg& arg, obrpc::ObMemberChangeBatchResult& result);
  VIRTUAL_FOR_UNITTEST int change_quorum_mc(const obrpc::ObModifyQuorumArg& arg, obrpc::ObMCLogRpcInfo& mc_log_info);
  int handle_ha_gts_ping_request(const obrpc::ObHaGtsPingRequest& request, obrpc::ObHaGtsPingResponse& response);
  int handle_ha_gts_get_request(const obrpc::ObHaGtsGetRequest& request);
  int handle_ha_gts_get_response(const obrpc::ObHaGtsGetResponse& response);
  int handle_ha_gts_heartbeat(const obrpc::ObHaGtsHeartbeat& heartbeat);
  int handle_ha_gts_update_meta(
      const obrpc::ObHaGtsUpdateMetaRequest& request, obrpc::ObHaGtsUpdateMetaResponse& response);
  int handle_ha_gts_change_member(
      const obrpc::ObHaGtsChangeMemberRequest& request, obrpc::ObHaGtsChangeMemberResponse& response);
  int send_ha_gts_get_request(const common::ObAddr& server, const obrpc::ObHaGtsGetRequest& request);

  VIRTUAL_FOR_UNITTEST int check_self_in_member_list(const common::ObPartitionKey& key, bool& in_member_list);

  // interface on ObTableDataService
  virtual int get_table_stat(const common::ObPartitionKey& key, common::ObTableStat& tstat) override;
  VIRTUAL_FOR_UNITTEST int schema_drop_partition(const ObCLogCallbackAsyncTask& offline_task);

  int replay_offline_partition_log(const ObPartitionKey& pkey, const uint64_t log_id, const bool is_physical_drop);

  int do_partition_loop_work(ObIPartitionGroup& partition);
  VIRTUAL_FOR_UNITTEST int do_warm_up_request(const obrpc::ObWarmUpRequestArg& arg, const int64_t recieve_ts);
  VIRTUAL_FOR_UNITTEST int generate_partition_weak_read_snapshot_version(ObIPartitionGroup& partition, bool& need_skip,
      bool& is_user_partition, int64_t& wrs_version, const int64_t max_stale_time);
  VIRTUAL_FOR_UNITTEST int check_can_start_service(
      bool& can_start_service, int64_t& safe_weak_read_snaspthot, ObPartitionKey& min_slave_read_ts_pkey);
  VIRTUAL_FOR_UNITTEST int admin_wash_ilog_cache(const clog::file_id_t file_id)
  {
    // Deprecated
    // return get_cursor_cache() == NULL ? OB_ERR_UNEXPECTED : get_cursor_cache()->admin_wash_ilog_cache(file_id);
    UNUSED(file_id);
    return common::OB_SUCCESS;
  }
  VIRTUAL_FOR_UNITTEST int admin_wash_ilog_cache()
  {
    // Deprecated
    // return get_cursor_cache() == NULL ? OB_ERR_UNEXPECTED : get_cursor_cache()->admin_wash_ilog_cache();
    return common::OB_SUCCESS;
  }
  virtual int query_range_to_macros(common::ObIAllocator& allocator, const common::ObPartitionKey& pkey,
      const common::ObIArray<common::ObStoreRange>& ranges, const int64_t type, uint64_t* macros_count,
      const int64_t* total_task_count, common::ObIArray<common::ObStoreRange>* splitted_ranges,
      common::ObIArray<int64_t>* split_index) override;

M
mw0 已提交
708 709
  virtual int get_multi_ranges_cost(const common::ObPartitionKey& pkey,
      const common::ObIArray<common::ObStoreRange>& ranges, int64_t& total_size) override;
O
oceanbase-admin 已提交
710 711
  virtual int split_multi_ranges(const common::ObPartitionKey& pkey,
      const common::ObIArray<common::ObStoreRange>& ranges, const int64_t expected_task_count,
L
LINxiansheng 已提交
712
      common::ObIAllocator& allocator, common::ObArrayArray<common::ObStoreRange>& multi_range_split_array) override;
O
oceanbase-admin 已提交
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
  VIRTUAL_FOR_UNITTEST int is_log_sync(
      const common::ObPartitionKey& key, bool& is_sync, uint64_t& max_confirmed_log_id);
  VIRTUAL_FOR_UNITTEST int set_region(const ObPartitionKey& key, clog::ObIPartitionLogService* pls);
  VIRTUAL_FOR_UNITTEST bool is_running() const
  {
    return is_running_;
  }

  int retry_post_operate_replica_res(const ObReplicaOpArg& arg, const int result);
  int retry_post_batch_migrate_replica_res(
      const ObReplicaOpType& type, const ObArray<ObPartMigrationRes>& report_res_list);
  bool is_working_partition(const common::ObPartitionKey& pkey);
  VIRTUAL_FOR_UNITTEST int is_local_zone_read_only(bool& is_read_only);
  share::ObIPartitionLocationCache* get_location_cache()
  {
    return location_cache_;
  }
  ObIPartitionServiceRpc& get_pts_rpc()
  {
    return pts_rpc_;
  }
  VIRTUAL_FOR_UNITTEST int append_local_sort_data(const common::ObPartitionKey& pkey,
      const share::ObBuildIndexAppendLocalDataParam& param, common::ObNewRowIterator& iter);
  VIRTUAL_FOR_UNITTEST int append_sstable(const common::ObPartitionKey& pkey,
      const share::ObBuildIndexAppendSSTableParam& param, common::ObNewRowIterator& iter);

  VIRTUAL_FOR_UNITTEST bool is_election_candidate(const common::ObPartitionKey& pkey);
  // for splitting partition
  VIRTUAL_FOR_UNITTEST int split_partition(
      const share::ObSplitPartition& split_info, common::ObIArray<share::ObPartitionSplitProgress>& result);
  VIRTUAL_FOR_UNITTEST int sync_split_source_log_success(
      const common::ObPartitionKey& pkey, const int64_t source_log_id, const int64_t source_log_ts);
  VIRTUAL_FOR_UNITTEST int sync_split_dest_log_success(const common::ObPartitionKey& pkey);
  VIRTUAL_FOR_UNITTEST int split_dest_partition(const common::ObPartitionKey& pkey,
      const ObPartitionSplitInfo& split_info, enum share::ObSplitProgress& progress);

  VIRTUAL_FOR_UNITTEST int calc_column_checksum(const common::ObPartitionKey& pkey, const uint64_t index_table_id,
      const int64_t schema_version, const uint64_t execution_id, const int64_t snapshot_version);
  VIRTUAL_FOR_UNITTEST int check_single_replica_major_sstable_exist(
      const common::ObPartitionKey& pkey, const uint64_t index_table_id);
  VIRTUAL_FOR_UNITTEST int check_single_replica_major_sstable_exist(
      const common::ObPartitionKey& pkey, const uint64_t index_table_id, int64_t& timestamp);
  VIRTUAL_FOR_UNITTEST int check_all_replica_major_sstable_exist(
      const common::ObPartitionKey& pkey, const uint64_t index_table_id);
  VIRTUAL_FOR_UNITTEST int check_all_replica_major_sstable_exist(
      const common::ObPartitionKey& pkey, const uint64_t index_table_id, int64_t& max_timestamp);
  VIRTUAL_FOR_UNITTEST int check_member_pg_major_sstable_enough(
      const common::ObPGKey& pg_key, const common::ObIArray<uint64_t>& table_ids);

  VIRTUAL_FOR_UNITTEST int check_all_partition_sync_state(const int64_t switchover_epoch);
  VIRTUAL_FOR_UNITTEST int send_leader_max_log_info();
  VIRTUAL_FOR_UNITTEST int get_role_and_leader_epoch(
      const common::ObPartitionKey& pkey, common::ObRole& role, int64_t& leader_epoch, int64_t& takeover_time) const;

  int turn_off_rebuild_flag(const ObReplicaOpArg& arg);

  int try_remove_from_member_list(const obrpc::ObMemberChangeArg& arg);
  // the destination partition of a split should not be removed before the source partition
  int check_split_dest_partition_can_remove(
      const common::ObPartitionKey& key, const common::ObPartitionKey& pg_key, bool& can_remove);
  int process_migrate_retry_task(const ObMigrateRetryTask& task);
  bool reach_tenant_partition_limit(const int64_t batch_cnt, const uint64_t tenant_id, const bool is_pg_arg);
  int retry_rebuild_loop();
O
obdev 已提交
776
  VIRTUAL_FOR_UNITTEST int get_pg_key(const ObPartitionKey& pkey, ObPGKey& pg_key) const override;
O
oceanbase-admin 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798
  static int mtl_init(ObTenantStorageInfo*& tenant_store_info)
  {
    int ret = common::OB_SUCCESS;
    tenant_store_info = OB_NEW(ObTenantStorageInfo, ObModIds::OB_PARTITION_SERVICE);
    if (OB_ISNULL(tenant_store_info)) {
      ret = common::OB_ALLOCATE_MEMORY_FAILED;
    }
    return ret;
  }
  static void mtl_destroy(ObTenantStorageInfo*& tenant_store_info)
  {
    common::ob_delete(tenant_store_info);
    tenant_store_info = nullptr;
  }

  VIRTUAL_FOR_UNITTEST ObPartitionGroupIndex& get_pg_index()
  {
    return pg_index_;
  }

  int set_restore_flag(const ObPartitionKey& pkey, const int16_t flag);
  int set_restore_snapshot_version_for_trans(const ObPartitionKey& pkey, const int64_t restore_snapshot_version);
799 800
  int get_restore_replay_info(const ObPartitionKey& pkey, uint64_t& restore_last_replay_log_id,
      int64_t& restore_last_replay_log_ts, int64_t& restore_snapshot_version);
O
oceanbase-admin 已提交
801 802 803 804 805 806 807 808 809 810 811 812 813
  int wait_all_trans_clear(const ObPartitionKey& pkey);
  int check_all_trans_in_trans_table_state(const ObPartitionKey& pkey);

  int get_offline_log_id(const ObPartitionKey& pkey, uint64_t offline_log_id) const;
  int set_member_list(const obrpc::ObSetMemberListBatchArg& arg, obrpc::ObCreatePartitionBatchRes& result);
  int check_pg_partition_exist(const ObPGKey& pg_key, const ObPartitionKey& pkey);
  int check_split_source_partition_exist(const ObPGKey& pg_key, const int64_t table_id);
  int check_tenant_pg_exist(const uint64_t tenant_id, bool& is_exist);
  int acquire_sstable(const ObITable::TableKey& table_key, ObTableHandle& table_handle);
  int create_trans_table_if_needed();
  int check_physical_flashback_succ(
      const obrpc::ObCheckPhysicalFlashbackArg& arg, obrpc::ObPhysicalFlashbackResultArg& result);
  int nonblock_renew_loc_cache(const common::ObPartitionKey& pkey);
L
LINxiansheng 已提交
814 815
  int submit_pt_update_task(const ObPartitionKey& pkey) override;
  int submit_pt_update_role_task(const ObPartitionKey& pkey) override;
O
oceanbase-admin 已提交
816 817 818 819 820 821
  int start_physical_flashback();
  int check_can_physical_flashback();
  int try_freeze_aggre_buffer(const common::ObPartitionKey& pkey);
  VIRTUAL_FOR_UNITTEST int check_has_need_offline_replica(
      const obrpc::ObTenantSchemaVersions& arg, obrpc::ObGetPartitionCountResult& result);
  int wait_schema_version(const int64_t tenant_id, int64_t schema_version, int64_t query_end_time);
822 823 824 825 826
  // For example, z1, z2, z3, total 3 zones, the primary zone is z1,
  // but we only allow z1 and z2 can do backup by "alter system set backup_zone='z1,z2'".
  // However, sometimes, for example, z3 will be choosed as partition leader and it will do archive.
  // In this case, we need let z3 refuse backup read/write io.
  int enable_backup_white_list();
O
oceanbase-admin 已提交
827 828 829 830 831 832 833 834 835 836 837
  // for log_archive
  int mark_log_archive_encount_fatal_error(
      const common::ObPartitionKey& pkey, const int64_t incarnation, const int64_t archive_round);
  // for migrate src
  int get_migrate_leader_and_parent(const common::ObPartitionKey& pkey, common::ObIArray<ObMigrateSrcInfo>& addr_array);
  int get_rebuild_src(const common::ObPartitionKey& pkey, const ObReplicaType& replica_type,
      common::ObIArray<ObMigrateSrcInfo>& addr_array);
  int get_migrate_member_list_src(const common::ObPartitionKey& pkey, common::ObIArray<ObMigrateSrcInfo>& addr_array);
  int mark_pg_creating(const ObPartitionKey& pkey);
  int mark_pg_created(const ObPartitionKey& pkey);
  int check_partition_exist(const common::ObPartitionKey& pkey, bool& exist) override;
838
  int report_pg_backup_task(const ObIArray<ObPartMigrationRes>& report_res_list);
O
oceanbase-admin 已提交
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854

  obrpc::ObCommonRpcProxy& get_rs_rpc_proxy();
  int inc_pending_batch_commit_count(const ObPartitionKey& pkey, memtable::ObMemtableCtx& mt_ctx, const int64_t log_ts);
  int inc_pending_elr_count(const ObPartitionKey& pkey, memtable::ObMemtableCtx& mt_ctx, const int64_t log_ts);
  int check_restore_point_complete(const ObPartitionKey& pkey, const int64_t snapshot_version, bool& is_complete);
  int try_advance_restoring_clog();
  int64_t get_total_partition_cnt()
  {
    return ATOMIC_LOAD(&total_partition_cnt_);
  }
  int get_partition_saved_last_log_info(
      const ObPartitionKey& pkey, uint64_t& last_replay_log_id, int64_t& last_replay_log_ts);
  int prepare_persist_log_archive_meta(ObIPartitionGroup& partition_group);

  // @brief: used for revoke all partition
  int try_revoke_all_leader(const election::ObElection::RevokeType& revoke_type);
855
  int check_standby_cluster_schema_condition(const ObPartitionKey& pkey, const int64_t schema_version);
O
oceanbase-admin 已提交
856

G
gm 已提交
857
private:
O
oceanbase-admin 已提交
858
  class ObStoreCtxGuard {
G
gm 已提交
859
  public:
O
oceanbase-admin 已提交
860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
    ObStoreCtxGuard() : is_inited_(false), trans_desc_(NULL), pkey_(), ctx_(), txs_(NULL), init_ts_(0)
    {}
    virtual ~ObStoreCtxGuard()
    {
      int err = common::OB_SUCCESS;
      static const int64_t WARN_TIME_US = 5 * 1000 * 1000;
      if (is_inited_) {
        if (OB_SUCCESS != (err = txs_->revert_store_ctx(*trans_desc_, pkey_, ctx_))) {
          STORAGE_LOG(ERROR,
              "fail to revert transaction context",
              "trans_desc",
              *trans_desc_,
              K_(pkey),
              "memtable context",
              ctx_.mem_ctx_,
              K(err));
        } else {
          is_inited_ = false;
        }
        const int64_t guard_ts = ObClockGenerator::getClock() - init_ts_;
        if (guard_ts >= WARN_TIME_US) {
          STORAGE_LOG(WARN, "guard too much time", K_(trans_desc), K_(pkey), K(guard_ts), "lbt", lbt());
        }
      }
    }
    int init(const transaction::ObTransDesc& trans_desc, const common::ObPartitionKey& pkey,
        transaction::ObTransService& txs);
    ObStoreCtx& get_store_ctx();

G
gm 已提交
889
  private:
O
oceanbase-admin 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906
    bool is_inited_;
    transaction::ObTransDesc* trans_desc_;
    common::ObPartitionKey pkey_;
    ObStoreCtx ctx_;
    transaction::ObTransService* txs_;
    int64_t init_ts_;
  };

  struct ParitionRegisterStatus {
    bool in_tran_service_;
    bool in_election_;
    bool in_replay_engine_;
    ParitionRegisterStatus() : in_tran_service_(false), in_election_(false), in_replay_engine_(false)
    {}
    TO_STRING_KV(K_(in_tran_service), K_(in_election), K_(in_replay_engine));
  };

G
gm 已提交
907
protected:
O
oceanbase-admin 已提交
908 909 910
  virtual int init_partition_group(ObIPartitionGroup& pg, const common::ObPartitionKey& pkey) override;
  virtual int post_replay_remove_pg_partition(const ObChangePartitionLogEntry& log_entry) override;

G
gm 已提交
911
private:
O
oceanbase-admin 已提交
912 913 914 915 916 917 918 919 920
  int check_can_physical_flashback_();
  bool is_tenant_active_(const uint64_t tenant_id) const;
  int check_init(void* cp, const char* cp_name) const;
  int get_trans_ctx_for_dml(
      const common::ObPartitionKey& pkey, const transaction::ObTransDesc& trans_desc, ObStoreCtxGuard& ctx_guard);
  int check_query_allowed(const ObPartitionKey& pkey, const transaction::ObTransDesc& trans_desc,
      ObStoreCtxGuard& ctx_guard, ObIPartitionGroupGuard& guard);
  int replay_add_store(const int64_t log_seq_num, const char* buf, const int64_t buf_len);
  int replay_remove_store(const int64_t log_seq_num, const char* buf, const int64_t buf_len);
921 922
  int replay_add_partition_to_pg_clog(
      const ObCreatePartitionParam& arg, const uint64_t log_id, const int64_t log_ts, int64_t& schema_version);
O
oceanbase-admin 已提交
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
  int replace_restore_info_(const uint64_t cur_tenant_id, const share::ObReplicaRestoreStatus is_restore,
      const int64_t create_frozen_version, ObCreatePartitionParam& create_param);
  int prepare_all_partitions();
  int remove_duplicate_partitions(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
  int add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup*>& partitions);
  int add_partitions_to_replay_engine(const common::ObIArray<ObIPartitionGroup*>& partitions);
  int batch_register_trans_service(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
  int batch_register_election_mgr(const bool is_pg, const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
      common::ObIArray<ObIPartitionGroup*>& partitions,
      common::ObIArray<blocksstable::ObStorageFileHandle>& files_handle);
  int batch_start_partition_election(
      const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<ObIPartitionGroup*>& partitions);
  int try_remove_from_member_list(ObIPartitionGroup& partition);
  int batch_prepare_splitting(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
  int try_add_to_member_list(const obrpc::ObMemberChangeArg& arg);
  int check_active_member(common::ObAddr& leader, const common::ObMember& member, const common::ObPartitionKey& key);
  int handle_add_replica_callback(const ObReplicaOpArg& arg, const int result);
  int handle_migrate_replica_callback(const ObReplicaOpArg& arg, const int result, bool& could_retry);
  int handle_rebuild_replica_callback(const ObReplicaOpArg& arg, const int result);
  int handle_change_replica_callback(const ObReplicaOpArg& arg, const int result);
  template <typename ResultT>
  int get_operate_replica_res(const ObReplicaOpArg& arg, const int result, ResultT& res);
  int retry_get_is_member_change_done(common::ObAddr& leader, obrpc::ObMCLogRpcInfo& mc_log_info);
  int retry_send_add_replica_mc_msg(common::ObAddr& server, const obrpc::ObMemberChangeArg& arg);
  int retry_send_remove_replica_mc_msg(common::ObAddr& server, const obrpc::ObMemberChangeArg& arg);
  int retry_post_add_replica_mc_msg(
      common::ObAddr& server, const obrpc::ObMemberChangeArg& arg, obrpc::ObMCLogRpcInfo& mc_log_info);
  int retry_post_remove_replica_mc_msg(
      common::ObAddr& server, const obrpc::ObMemberChangeArg& arg, obrpc::ObMCLogRpcInfo& mc_log_info);
  int retry_get_active_member(common::ObAddr& server, const common::ObPartitionKey& key, common::ObMemberList& mlist);

  int wait_clog_replay_over();
  int wait_fetch_log(const common::ObPartitionKey& key);

  bool dispatch_task(const ObCbTask& cb_task, ObIPartitionGroup* partition);
  bool is_role_change_done(const common::ObPartitionKey& pkey, const ObPartitionState& state) const;
  int check_tenant_out_of_memstore_limit(const uint64_t tenant_id, bool& is_out_of_mem);
  int save_base_schema_version(ObIPartitionGroupGuard& guard);
  int check_schema_version(const ObIPartitionArrayGuard& pkey_guard_arr);
  void init_tenant_bit_set();

  void rollback_partition_register(const common::ObPartitionKey& pkey, bool rb_txs, bool rb_rp_eg);
  int report_rs_to_rebuild_replica(const common::ObPartitionKey& pkey, const common::ObAddr& server);
  int check_mc_allowed_by_server_lease(bool& is_mc_allowed);
  int set_partition_region_priority(const ObPartitionKey& pkey, clog::ObIPartitionLogService* pls);
  static int build_migrate_replica_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObMigrateReplicaBatchRes& res);
  static int build_change_replica_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObChangeReplicaBatchRes& res);
  static int build_add_replica_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObAddReplicaBatchRes& res);
  static int build_backup_replica_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObBackupBatchRes& res);
  static int build_validate_backup_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObValidateBatchRes& res);
978 979 980 981
  static int build_backup_backupset_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObBackupBackupsetBatchRes& res);
  static int build_backup_archivelog_batch_res(
      const ObArray<ObPartMigrationRes>& report_res_list, obrpc::ObBackupArchiveLogBatchRes& res);
O
oceanbase-admin 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009

  int decode_log_type(const char* log, const int64_t size, int64_t& pos, ObStorageLogType& log_type);

  int split_source_partition_(const common::ObPartitionKey& pkey, const int64_t schema_version,
      const share::ObSplitPartitionPair& spp, enum share::ObSplitProgress& progress);
  int split_dest_partition_(const common::ObPartitionKey& pkey, const ObPartitionSplitInfo& split_info,
      enum share::ObSplitProgress& progress);

  int remove_partition(ObIPartitionGroup* partition, const bool write_slog);
  int remove_pg_from_mgr(const ObIPartitionGroup* partition, const bool write_slog);
  int inner_add_partition(ObIPartitionGroup& partition, const bool need_check_tenant, const bool is_replay,
      const bool allow_multi_value) override;
  int inner_del_partition(const common::ObPartitionKey& pkey) override;
  int inner_del_partition_for_replay(const common::ObPartitionKey& pkey, const int64_t file_id) override;
  int inner_del_partition_impl(const common::ObPartitionKey& pkey, const int64_t* file_id);
  int create_sstables(const obrpc::ObCreatePartitionArg& arg, const bool in_slog_trans,
      ObIPartitionGroup& partition_group, ObTablesHandle& handle);
  int create_sstables(const ObCreatePartitionParam& arg, const bool in_slog_trans, ObIPartitionGroup& partition_group,
      ObTablesHandle& handle);
  int generate_task_pg_info_(const ObPartitionArray& pgs, ObIPartitionArrayGuard& out_pg_guard_arr);
  int do_remove_replica(const common::ObPartitionKey& pkey, const common::ObReplicaMember& dst);
  int gen_rebuild_arg_(
      const common::ObPartitionKey& pkey, const common::ObReplicaType replica_type, obrpc::ObRebuildReplicaArg& arg);
  int gen_standby_restore_arg_(const common::ObPartitionKey& pkey, const common::ObReplicaType replica_type,
      const common::ObAddr& src_server, obrpc::ObRebuildReplicaArg& arg);
  int handle_rebuild_result_(
      const common::ObPartitionKey pkey, const common::ObReplicaType replica_type, const int ret_val);
  bool reach_tenant_partition_limit_(const int64_t batch_cnt, const uint64_t tenant_id, const bool is_pg_arg);
O
obdev 已提交
1010 1011
  int get_pg_key_(const ObPartitionKey& pkey, ObPGKey& pg_key) const;
  int get_pg_key_from_index_schema_(const ObPartitionKey& pkey, ObPGKey& pg_key) const;
O
oceanbase-admin 已提交
1012 1013 1014 1015 1016 1017 1018 1019
  int submit_add_partition_to_pg_clog_(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
      const int64_t timeout, common::ObIArray<uint64_t>& log_id_arr);
  int write_partition_schema_version_change_clog_(const common::ObPGKey& pg_key, const common::ObPartitionKey& pkey,
      const int64_t schema_version, const uint64_t index_id, const int64_t timeout, uint64_t& log_id, int64_t& log_ts);
  int check_partition_state_(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
      common::ObIArray<obrpc::ObCreatePartitionArg>& target_batch_arg, common::ObIArray<int>& batch_res);
  void free_partition_list(ObArray<ObIPartitionGroup*>& partition_list);
  void submit_pt_update_task_(const ObPartitionKey& pkey, const bool need_report_checksum = true);
1020
  int submit_pg_pt_update_task_(const ObPartitionKey& pkey);
O
oceanbase-admin 已提交
1021 1022 1023
  int try_inc_total_partition_cnt(const int64_t new_partition_cnt, const bool need_check);
  int physical_flashback();
  int clean_all_clog_files_();
1024 1025
  int report_pg_backup_backupset_task(const ObIArray<ObPartMigrationRes>& report_res_list);
  int report_pg_backup_archivelog_task(const ObIArray<ObPartMigrationRes>& report_res_list);
O
oceanbase-admin 已提交
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
  int get_create_pg_param(const obrpc::ObCreatePartitionArg& arg, const ObSavedStorageInfoV2& info,
      const int64_t data_version, const bool write_pg_slog, const ObPartitionSplitInfo& split_info,
      const int64_t split_state, blocksstable::ObStorageFileHandle* file_handle, ObBaseFileMgr* file_mgr,
      ObCreatePGParam& param);
  int check_flashback_need_remove_pg(const int64_t flashback_scn, ObIPartitionGroup* partition, bool& need_remve);
  int remove_flashback_unneed_pg(ObIPartitionGroup* partition);
  int check_condition_before_set_restore_flag_(
      const common::ObPartitionKey& pkey, const int16_t flag, const int64_t restore_snapshot_version);
  int get_primary_cluster_migrate_src(const common::ObPartitionKey& pkey, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int get_standby_cluster_migrate_src(const common::ObPartitionKey& pkey, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int get_primary_cluster_migrate_member_src(const common::ObPartitionKey& pkey, const ObReplicaType& replica_type,
      hash::ObHashSet<ObMigrateSrcInfo>& src_set, common::ObIArray<ObMigrateSrcInfo>& src_array);
  int get_standby_cluster_migrate_member_src(const common::ObPartitionKey& pkey, const ObReplicaType& replica_type,
      hash::ObHashSet<ObMigrateSrcInfo>& src_set, common::ObIArray<ObMigrateSrcInfo>& src_array);

  int add_migrate_member_list(const common::ObPartitionKey& pkey, const common::ObRole& role,
      const bool is_standby_cluster, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int add_current_migrate_member_list(const common::ObPartitionKey& pkey, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int add_meta_table_migrate_src(const common::ObPartitionKey& pkey, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int add_migrate_src(const ObMigrateSrcInfo& src_info, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int add_primary_meta_table_migrate_src(const common::ObPartitionKey& pkey, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int inner_add_meta_table_migrate_src(const common::ObIArray<share::ObPartitionReplica>& replicas,
      const int64_t cluster_id, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int extract_pkeys(
      const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<ObPartitionKey>& pkey_array);
  int add_location_cache_migrate_src(const common::ObPartitionKey& pkey, const common::ObRole& role,
      const bool is_standby_cluster, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);
  int inner_add_location_migrate_src(const common::ObIArray<share::ObReplicaLocation>& locations,
      const int64_t cluster_id, hash::ObHashSet<ObMigrateSrcInfo>& src_set,
      common::ObIArray<ObMigrateSrcInfo>& src_array);

G
gm 已提交
1066
private:
O
oceanbase-admin 已提交
1067 1068 1069
  // disallow copy
  DISALLOW_COPY_AND_ASSIGN(ObPartitionService);

G
gm 已提交
1070
private:
O
oceanbase-admin 已提交
1071
  class ReloadLocalityTask : public common::ObTimerTask {
G
gm 已提交
1072
  public:
O
oceanbase-admin 已提交
1073 1074 1075 1076 1077 1078 1079
    ReloadLocalityTask();
    virtual ~ReloadLocalityTask()
    {}
    int init(ObPartitionService* ptt_svr);
    virtual void runTimerTask();
    void destroy();

G
gm 已提交
1080
  private:
O
oceanbase-admin 已提交
1081 1082 1083 1084 1085
    bool is_inited_;
    ObPartitionService* ptt_svr_;
  };

  class PurgeRetireMemstoreTask : public common::ObTimerTask {
G
gm 已提交
1086
  public:
O
oceanbase-admin 已提交
1087 1088 1089 1090
    PurgeRetireMemstoreTask();
    virtual ~PurgeRetireMemstoreTask()
    {}

G
gm 已提交
1091
  public:
O
oceanbase-admin 已提交
1092 1093 1094 1095
    int init();
    void destroy();
    virtual void runTimerTask();

G
gm 已提交
1096
  private:
O
oceanbase-admin 已提交
1097 1098 1099 1100
    bool is_inited_;
  };

  class ClogRequiredMinorFreezeTask : public common::ObTimerTask {
G
gm 已提交
1101
  public:
O
oceanbase-admin 已提交
1102 1103 1104 1105
    ClogRequiredMinorFreezeTask();
    virtual ~ClogRequiredMinorFreezeTask()
    {}

G
gm 已提交
1106
  public:
O
oceanbase-admin 已提交
1107 1108 1109 1110
    int init(clog::ObICLogMgr* clog_mgr);
    void destroy();
    virtual void runTimerTask();

G
gm 已提交
1111
  private:
O
oceanbase-admin 已提交
1112 1113 1114 1115 1116
    bool is_inited_;
    clog::ObICLogMgr* clog_mgr_;
  };

  class ObRefreshLocalityTask : public common::IObDedupTask {
G
gm 已提交
1117
  public:
O
oceanbase-admin 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
    explicit ObRefreshLocalityTask(ObPartitionService* partition_service);
    virtual ~ObRefreshLocalityTask();
    virtual int64_t hash() const;
    virtual bool operator==(const IObDedupTask& other) const;
    virtual int64_t get_deep_copy_size() const;
    virtual IObDedupTask* deep_copy(char* buffer, const int64_t buf_size) const;
    virtual int64_t get_abs_expired_time() const
    {
      return 0;
    }
    virtual int process();

G
gm 已提交
1130
  private:
O
oceanbase-admin 已提交
1131 1132 1133 1134 1135 1136 1137 1138
    ObPartitionService* partition_service_;
  };

  struct PartitionSizePair {
    ObPartitionKey pkey_;
    int64_t size_;
  };
  class PrintHashedFreezeFunctor {
G
gm 已提交
1139
  public:
O
oceanbase-admin 已提交
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
    PrintHashedFreezeFunctor()
    {}
    bool operator()(const common::ObPartitionKey& pkey, ObIPSFreezeCb* cb)
    {
      UNUSED(cb);
      bool bool_ret = true;
      STORAGE_LOG(WARN, "existing freeze partition", K(pkey));
      return bool_ret;
    }
  };

G
gm 已提交
1151
private:
O
oceanbase-admin 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
  friend class ObPartitionIterator;
  static const int64_t BITS_PER_BITSETWORD = 32;
  static const int64_t BITSETWORD_SHIFT_NUM = 5;
  static const int64_t BITSETWORD_OFFSET_MASK = BITS_PER_BITSETWORD - 1;
  static const int64_t BITSET_BITS_NUM = (OB_DEFAULT_TENANT_COUNT + BITSETWORD_OFFSET_MASK) & (~BITSETWORD_OFFSET_MASK);
  static const int64_t BITSET_WORDS_NUM = BITSET_BITS_NUM / BITS_PER_BITSETWORD;
  typedef uint32_t BITSET_TYPE;
  static const int64_t WAIT_FREEZE_INTERVAL = 100000;  // 100ms
  static const int64_t ASSEMBLE_THREAD_NUM = 8;
  static const int64_t REFRESH_LOCALITY_TASK_NUM = 5;
  bool is_running_;
  obrpc::ObBatchRpc batch_rpc_;
  obrpc::ObTransRpcProxy tx_rpc_proxy_;
  obrpc::ObDupTableRpcProxy dup_table_rpc_proxy_;
  obrpc::ObLogRpcProxy clog_rpc_proxy_;
  obrpc::ObPartitionServiceRpcProxy pts_rpc_proxy_;
  obrpc::ObXARpcProxy xa_proxy_;
  obrpc::ObCommonRpcProxy* rs_rpc_proxy_;
  obrpc::ObSrvRpcProxy* srv_rpc_proxy_;
  ObPartitionServiceRpc pts_rpc_;
  clog::ObICLogMgr* clog_mgr_;
  election::ObIElectionMgr* election_mgr_;
  clog::ObLogReplayEngineWrapper* rp_eg_wrapper_;
  ObIPartitionReport* rs_cb_;

  share::ObIPartitionLocationCache* location_cache_;
  share::ObRsMgr* rs_mgr_;
  ObCLogCallbackAsyncWorker cb_async_worker_;
  ObFreezeAsyncWorker freeze_async_worker_;
  ReloadLocalityTask reload_locality_task_;
  PurgeRetireMemstoreTask purge_retire_memstore_task_;
  ClogRequiredMinorFreezeTask clog_required_minor_freeze_task_;
  logservice::ObExtLogService::StreamTimerTask ext_log_service_stream_timer_task_;
  logservice::ObExtLogService::LineCacheTimerTask line_cache_timer_task_;
  common::ObAddr self_addr_;
  BITSET_TYPE tenant_bit_set_[BITSET_WORDS_NUM];

  ObMigrateRetryQueueThread migrate_retry_queue_thread_;
  ObCallbackQueueThread cb_queue_thread_;
  ObCallbackQueueThread large_cb_queue_thread_;
  ObSlogWriterQueueThread slog_writer_thread_pool_;
  common::ObDedupQueue refresh_locality_task_queue_;
  ObWarmUpService* warm_up_service_;
  common::ObInOutBandwidthThrottle* bandwidth_throttle_;
  ObLocalityManager locality_manager_;
  ObPartitionSplitWorker split_worker_;
  mutable common::ObSpinLock trans_version_lock_;
  int64_t global_max_decided_trans_version_;
  ObGarbageCollector garbage_collector_;
  ObPartitionWorker partition_worker_;
  ObTransCheckpointWorker trans_checkpoint_worker_;
  clog::ObClogAggreRunnable clog_aggre_runnable_;
  ObDupReplicaChecker dup_replica_checker_;
  gts::ObHaGtsManager gts_mgr_;
  gts::ObHaGtsSource gts_source_;
  ObRebuildReplicaService rebuild_replica_service_;
  ObServerPGMetaSLogFilter slog_filter_;
  int64_t total_partition_cnt_;
  ObSSTableGarbageCollector sstable_garbage_collector_;
  ObAutoPartScheduler auto_part_scheduler_;
  ObPartitionGroupCreateChecker create_pg_checker_;
};

inline int ObPartitionService::ObStoreCtxGuard::init(
    const transaction::ObTransDesc& trans_desc, const common::ObPartitionKey& pkey, transaction::ObTransService& txs)
{
  int ret = common::OB_SUCCESS;
  if (is_inited_) {
    ret = common::OB_INIT_TWICE;
    STORAGE_LOG(WARN, "init twice", K(ret));
  } else if (!trans_desc.is_valid() || !pkey.is_valid() || !ctx_.is_valid()) {
    ret = common::OB_INVALID_ARGUMENT;
    STORAGE_LOG(WARN, "invalid argument(s)", K(trans_desc), K(pkey), "memtable context", ctx_.mem_ctx_, K(ret));
  } else {
    trans_desc_ = &const_cast<transaction::ObTransDesc&>(trans_desc);
    pkey_ = pkey;
    txs_ = &txs;
    is_inited_ = true;
    init_ts_ = ObClockGenerator::getClock();
  }
  return ret;
}

inline ObStoreCtx& ObPartitionService::ObStoreCtxGuard::get_store_ctx()
{
  return ctx_;
}

inline int ObPartitionService::check_init(void* cp, const char* cp_name) const
{
  int ret = common::OB_SUCCESS;
  if (!is_inited_ || NULL == cp || NULL == cp_name) {
    ret = common::OB_NOT_INIT;
    STORAGE_LOG(WARN, "component does not exist", "component name", cp_name, K(ret));
  }
  return ret;
}

inline int ObPartitionService::internal_kill_trans(transaction::ObTransDesc& trans_desc)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->internal_kill_trans(trans_desc);
  }
  return ret;
}

inline int ObPartitionService::kill_query_session(const transaction::ObTransDesc& trans_desc, const int status)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->kill_query_session(trans_desc, status);
  }

  return ret;
}

inline int ObPartitionService::get_trans_ctx_for_dml(
    const common::ObPartitionKey& pkey, const transaction::ObTransDesc& trans_desc, ObStoreCtxGuard& ctx_guard)
{
  int ret = common::OB_SUCCESS;
  // DML statement will always use invalid snapshot
  int64_t user_specified_snapshot = transaction::ObTransVersion::INVALID_TRANS_VERSION;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "transaction service does not exist", K(ret));
  } else if (OB_FAIL(txs_->get_store_ctx(trans_desc, pkey, ctx_guard.get_store_ctx()))) {
    STORAGE_LOG(WARN, "can not get transaction context", K(pkey), K(trans_desc), K(ret), K(user_specified_snapshot));
  } else {
    if (OB_FAIL(ctx_guard.init(trans_desc, pkey, *txs_))) {
      STORAGE_LOG(WARN, "fail to init transaction context guard", K(ret));
      ret = common::OB_SUCCESS;
      if (OB_FAIL(txs_->revert_store_ctx(trans_desc, pkey, ctx_guard.get_store_ctx()))) {
        STORAGE_LOG(WARN, "fail to revert transaction context", K(ret));
      }
    }
  }
  return ret;
}

inline bool ObPartitionService::is_empty() const
{
  return pg_mgr_.is_empty();
}

inline transaction::ObTransService* ObPartitionService::get_trans_service()
{
  return txs_;
}

inline memtable::ObIMemtableCtxFactory* ObPartitionService::get_mem_ctx_factory()
{
  memtable::ObIMemtableCtxFactory* factory = NULL;

  if (NULL != txs_) {
    factory = txs_->get_mem_ctx_factory();
  }
  return factory;
}

inline clog::ObICLogMgr* ObPartitionService::get_clog_mgr()
{
  return clog_mgr_;
}

inline election::ObIElectionMgr* ObPartitionService::get_election_mgr()
{
  return election_mgr_;
}

inline ObPartitionSplitWorker* ObPartitionService::get_split_worker()
{
  return &split_worker_;
}

inline clog::ObClogAggreRunnable* ObPartitionService::get_clog_aggre_runnable()
{
  return &clog_aggre_runnable_;
}

inline bool ObPartitionService::is_scan_disk_finished()
{
  return (is_inited_ && NULL != clog_mgr_ && clog_mgr_->is_scan_finished());
}

inline ObIPartitionComponentFactory* ObPartitionService::get_cp_fty()
{
  return cp_fty_;
}

OB_INLINE ObPartitionService& ObPartitionService::get_instance()
{
  static ObPartitionService instance;
  return instance;
}

OB_INLINE int ObPartitionService::xa_start(const transaction::ObXATransID& xid, const int64_t flags,
    const int64_t xa_end_timeout_seconds, transaction::ObTransDesc& trans_desc)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->xa_start_v2(xid, flags, xa_end_timeout_seconds, trans_desc);
  }
  return ret;
}

OB_INLINE int ObPartitionService::xa_end(
    const transaction::ObXATransID& xid, const int64_t flags, transaction::ObTransDesc& trans_desc)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->xa_end_v2(xid, flags, trans_desc);
  }
  return ret;
}

OB_INLINE int ObPartitionService::xa_prepare(
    const transaction::ObXATransID& xid, const uint64_t tenant_id, const int64_t stmt_expired_time)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->xa_prepare_v2(xid, tenant_id, stmt_expired_time);
  }
  return ret;
}

OB_INLINE int ObPartitionService::xa_end_trans(const transaction::ObXATransID& xid, const bool is_rollback,
G
gjw2284740 已提交
1388
    const int64_t flags, transaction::ObTransDesc& trans_desc, bool& access_temp_table)
O
oceanbase-admin 已提交
1389 1390 1391 1392 1393
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
G
gjw2284740 已提交
1394
    ret = txs_->xa_end_trans_v2(xid, is_rollback, flags, trans_desc, access_temp_table);
O
oceanbase-admin 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413
  }
  return ret;
}

OB_INLINE int ObPartitionService::get_xa_trans_state(int32_t& state, transaction::ObTransDesc& trans_desc)
{
  int ret = common::OB_SUCCESS;
  if (OB_FAIL(check_init(txs_, "transaction service"))) {
    STORAGE_LOG(WARN, "ObTransService check init error");
  } else {
    ret = txs_->get_xa_trans_state(state, trans_desc);
  }
  return ret;
}

}  // namespace storage
}  // namespace oceanbase

#endif  // OCEANBASE_STORAGE_OB_PARTITION_SERVICE