ob_trans_part_ctx.h 34.2 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_TRANSACTION_OB_TRANS_PART_CTX_
#define OCEANBASE_TRANSACTION_OB_TRANS_PART_CTX_

#include "ob_trans_ctx.h"
#include "ob_trans_dependency.h"
#include "ob_ts_mgr.h"
#include "lib/container/ob_mask_set2.h"
#include "ob_dup_table.h"
#include "ob_trans_listener_handler.h"

namespace oceanbase {

namespace common {
class ObPartitionKey;
class ObAddr;
class ObMaskSet;
}  // namespace common

namespace storage {
struct ObStoreRowLockState;
}

namespace transaction {
class ObITransCtxMgr;
class ObTransMsg;
class ObRedoLogSyncResponseMsg;
class ObTransTaskWorker;
class ObTransStatusMgr;
class ObTransResultInfo;
class ObTransSplitInfo;
// class ObTransListenerHandler;
}  // namespace transaction

namespace memtable {
class ObIMemtableCtxFactory;
class ObIMemtableCtx;
};  // namespace memtable

namespace clog {
class ObLogMeta;
};

enum {
  USER_REQUEST_UNKNOWN = -1,
  USER_COMMIT = 0,
  USER_ABORT = 1,
};

namespace transaction {
class TransResultInfo {
G
gm 已提交
63
public:
O
oceanbase-admin 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  TransResultInfo() : result_info_(NULL), registered_(false)
  {}
  ~TransResultInfo()
  {
    destroy();
  }
  void destroy()
  {
    reset();
  }
  void reset();
  int alloc_trans_result_info();
  ObTransResultInfo* get_trans_result_info()
  {
    return result_info_;
  }
  void set_registered(bool registered)
  {
    registered_ = registered;
  }

G
gm 已提交
85
private:
O
oceanbase-admin 已提交
86 87 88 89 90 91
  ObTransResultInfo* result_info_;
  bool registered_;
};

// participant transaction context
class ObPartTransCtx : public ObDistTransCtx, public ObTsCbTask {
92
  friend class IterateTransStatForKeyFunctor;
O
oceanbase-admin 已提交
93

G
gm 已提交
94
public:
O
oceanbase-admin 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  ObPartTransCtx()
      : ObDistTransCtx("participant", ObTransCtxType::PARTICIPANT),
        ObTsCbTask(),
        is_inited_(false),
        mt_ctx_(),
        prev_redo_log_ids_(ObModIds::OB_TRANS_REDO_LOG_ID_ARRAY, OB_MALLOC_NORMAL_BLOCK_SIZE),
        partition_log_info_arr_(ObModIds::OB_TRANS_PARTITION_LOG_INFO_ARRAY, OB_MALLOC_NORMAL_BLOCK_SIZE),
        listener_handler_(NULL)
  {
    reset();
  }
  ~ObPartTransCtx()
  {
    destroy();
  }
  // clog_adapter. the adapter belong transaction engine and clog engine
  int init(const uint64_t tenant_id, const ObTransID& trans_id, const int64_t trans_expired_time,
      const common::ObPartitionKey& self, ObITransCtxMgr* ctx_mgr, const ObStartTransParam& trans_param,
      const uint64_t cluster_version, ObTransService* trans_service, const uint64_t cluster_id,
      const int64_t leader_epoch, const bool can_elr);
115
  virtual void destroy() override;
O
oceanbase-admin 已提交
116 117 118
  void reset();
  int construct_context(const ObTransMsg& msg);

G
gm 已提交
119
public:
O
oceanbase-admin 已提交
120 121 122 123 124 125 126 127
  int start_trans();
  int start_task(const ObTransDesc& trans_desc, const int64_t snapshot_version, const bool need_update_gts,
      storage::ObIPartitionGroup* ob_partition);
  int end_task(
      const bool is_rollback, const ObTransDesc& trans_desc, const int64_t sql_no, const int64_t stmt_min_sql_no);
  int end_task_(
      const bool is_rollback, const ObTransDesc& trans_desc, const int64_t sql_no, const int64_t stmt_min_sql_no);
  int handle_message(const ObTransMsg& msg);
128 129
  bool is_inited() const override;
  int handle_timeout(const int64_t delay) override;
O
oceanbase-admin 已提交
130 131 132 133
  int get_end_trans_callback_item(ObEndTransCallbackArray& cb_array);
  /*
   * graceful kill: wait trx finish logging
   */
134
  int kill(const KillTransArg& arg, ObEndTransCallbackArray& cb_array) override;
O
oceanbase-admin 已提交
135 136 137 138 139 140 141
  int wait_1pc_trx_end_in_spliting(bool& trx_end);
  int check_cur_partition_split_(bool& is_split_partition);
  memtable::ObMemtableCtx* get_memtable_ctx()
  {
    return &mt_ctx_;
  }
  int set_memtable_ctx(memtable::ObIMemtableCtx* mt_ctx);
142 143 144 145
  int leader_revoke(const bool first_check, bool& need_release, ObEndTransCallbackArray& cb_array) override;
  int leader_takeover(const int64_t checkpoint) override;
  int leader_active(const storage::LeaderActiveArg& arg) override;
  bool can_be_freezed() const override;
O
oceanbase-admin 已提交
146
  int kill_trans(bool& need_convert_to_dist_trans);
X
xs0 已提交
147
  int commit(const bool is_rollback, sql::ObIEndTransCallback* cb, const bool is_readonly, const MonotonicTs commit_time,
O
oceanbase-admin 已提交
148
      const int64_t stmt_expired_time, const ObStmtRollbackInfo& stmt_rollback_info,
149
      const common::ObString& app_trace_info, bool& need_convert_to_dist_trans) override;
O
oceanbase-admin 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
  int set_stmt_info(const ObTransStmtInfo& stmt_info);
  const ObTransStmtInfo& get_stmt_info() const
  {
    return stmt_info_;
  }
  int recover_dist_trans(const ObAddr& scheduler);
  bool is_prepared() const;
  bool is_changing_leader() const
  {
    return is_changing_leader_;
  }
  int submit_log(const int64_t log_type);
  int callback_big_trans(
      const ObPartitionKey& pkey, const int64_t log_type, const int64_t log_id, const int64_t timestamp);
  int generate_redo_prepare_log_info(char* buf, const int64_t size, int64_t& pos, const int64_t request_id,
      const PartitionLogInfoArray& partition_log_info_arr, const int64_t commit_version, const bool have_prev_trans,
      clog::ObLogInfo& log_info, clog::ObISubmitLogCb*& cb);
  int get_prepare_version_if_prepared(bool& is_prepared, int64_t& prepare_version);
O
obdev 已提交
168
  int get_prepare_version_before_logts(const int64_t freeze_ts, bool& has_prepared, int64_t& prepare_version);
O
oceanbase-admin 已提交
169 170 171 172 173
  int64_t get_snapshot_version() const;
  int64_t get_commit_version() const
  {
    return get_global_trans_version_();
  }
174
  uint64_t hash() const override
O
oceanbase-admin 已提交
175 176 177
  {
    return trans_id_.hash();
  }
178 179 180
  int get_gts_callback(const MonotonicTs srr, const int64_t gts, const MonotonicTs receive_gts_ts) override;
  int gts_elapse_callback(const MonotonicTs srr, const int64_t gts) override;
  MonotonicTs get_stc() const override
O
oceanbase-admin 已提交
181 182 183
  {
    return stc_;
  }
184
  int64_t get_request_ts() const override
O
oceanbase-admin 已提交
185 186 187
  {
    return gts_request_ts_;
  }
188
  uint64_t get_tenant_id() const override
O
oceanbase-admin 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  {
    return tenant_id_;
  }
  int get_min_log(uint64_t& log_id, int64_t& log_ts) const;
  int elr_next_trans_callback(const int64_t task_type, const ObTransID& trans_id, int state);
  void check_memtable_ctx_ref();
  int64_t get_redo_log_no() const
  {
    return redo_log_no_;
  }
  int64_t get_mutator_log_no() const
  {
    return mutator_log_no_;
  }

G
gm 已提交
204
public:
O
oceanbase-admin 已提交
205 206 207 208 209 210
  int replay_sp_redo_log(
      const ObSpTransRedoLog& log, const int64_t timestamp, const uint64_t log_id, int64_t& log_table_version);
  int replay_sp_commit_log(const ObSpTransCommitLog& log, const int64_t timestamp, const uint64_t log_id);
  int replay_sp_abort_log(const ObSpTransAbortLog& log, const int64_t timestamp, const uint64_t log_id);
  int replay_redo_log(const ObTransRedoLog& log, const int64_t trans_version, const uint64_t log_id,
      const bool with_prepare, int64_t& log_table_version);
O
obdev 已提交
211
  int replay_record_log(const ObTransRecordLog &log, const int64_t trans_version, const uint64_t log_id);
O
oceanbase-admin 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
  int replay_prepare_log(const ObTransPrepareLog& log, const int64_t trans_version, const uint64_t log_id,
      const bool batch_committed, const int64_t checkpoint);
  int replay_commit_log(const ObTransCommitLog& log, const int64_t trans_version, const uint64_t log_id);
  int replay_abort_log(const ObTransAbortLog& log, const int64_t trans_version, const uint64_t log_id);
  int replay_clear_log(const ObTransClearLog& log, const int64_t trans_version, const uint64_t log_id);
  int replay_trans_state_log(const ObTransStateLog& log, const int64_t timestamp, const uint64_t log_id);
  int replay_trans_mutator_log(
      const ObTransMutatorLog& log, const int64_t timestamp, const uint64_t log_id, int64_t& log_table_version);
  int replay_mutator_abort_log(const ObTransMutatorAbortLog& log, const int64_t timestamp, const uint64_t log_id);
  int replay_start_working_log(const int64_t timestamp, const uint64_t log_id);
  int replay_listener_commit_log(const ObTransCommitLog& log, const int64_t timestamp, const uint64_t log_id);
  int replay_listener_abort_log(const ObTransAbortLog& log, const int64_t timestamp, const uint64_t log_id);
  // log has persistented to abstract log-device (logic paxos log device)
  int on_sync_log_success(
      const int64_t log_type, const int64_t log_id, const int64_t version, const bool batch_committed);
  // a log written job was submitted successfully to logging layer
  int on_submit_log_success(
      const bool with_need_update_version, const uint64_t cur_log_id, const int64_t cur_log_timestamp);
  // submit log fail the log will not been written successfully
  int on_submit_log_fail(const int retcode);
  int set_commit_task_count(const int64_t commit_task_count);
  int set_trans_app_trace_id_str(const ObString& app_trace_id_str);
  const ObString& get_trans_app_trace_id_str() const
  {
    return trace_info_.get_app_trace_id();
  }
  int check_schema_version_elapsed(const int64_t schema_version, const int64_t refreshed_schema_ts);
  int check_ctx_create_timestamp_elapsed(const int64_t ts);
  int batch_submit_log_over(const bool submit_log_succ, const int64_t commit_version);
  int checkpoint(const int64_t checkpoint, const int64_t safe_slave_read_timestamp, bool& checkpoint_succ);
  int relocate_data(memtable::ObIMemtable* memtable);
  int get_memtable_key_arr(ObMemtableKeyArray& memtable_key_arr);
  uint64_t get_lock_for_read_retry_count() const
  {
    return mt_ctx_.get_lock_for_read_retry_count();
  }
  int prepare_changing_leader(const common::ObAddr& proposal_leader);
  int handle_2pc_pre_commit_request(const int64_t commit_version);
  int handle_2pc_commit_clear_request(const int64_t commit_version);
  int handle_2pc_pre_prepare_request(const int64_t prepare_version, const int64_t request_id, const ObAddr& scheduler,
      const ObPartitionKey& coordinator, const ObPartitionArray& participants, const common::ObString& app_trace_info,
      int& status, int64_t& prepare_log_id, int64_t& prepare_log_ts, bool& have_prev_trans);
  int handle_2pc_local_prepare_request(const int64_t request_id, const ObAddr& scheduler,
      const ObPartitionKey& coordinator, const ObPartitionArray& participants, const common::ObString& app_trace_info,
      const MonotonicTs stc, const int status, const bool is_xa_prepare);
  int handle_2pc_local_commit_request(
      const int64_t msg_type, const int64_t trans_version, const PartitionLogInfoArray& partition_log_info_arr);
  int handle_2pc_local_clear_request();
  int handle_2pc_local_msg_response(const ObPartitionKey& partition, const ObTransID& trans_id, const int64_t log_type);
  int before_prepare();
  int check_scheduler_status();
  int64_t get_cur_query_start_time() const
  {
    return cur_query_start_time_;
  }
  int set_cur_query_start_time(const int64_t cur_query_start_time);
  uint32_t get_ctx_id() const
  {
    return mt_ctx_.get_ctx_descriptor();
  }
  // register a callback task called after current transaction committed
  int submit_elr_callback_task(const int64_t callback_type, const ObTransID& trans_id, int state);
  ObPartTransCtxDependencyWrap& get_ctx_dependency_wrap()
  {
    return ctx_dependency_wrap_;
  }
  // check early lock release is prepared
  int check_elr_prepared(bool& elr_prepared, int64_t& elr_commit_version);
  int insert_prev_trans(const uint32_t ctx_id, ObTransCtx* prev_trans_ctx);
Z
zh0 已提交
281
  // void audit_partition(const bool is_rollback, const sql::stmt::StmtType stmt_type);
O
oceanbase-admin 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
  int handle_redo_log_sync_response(const ObRedoLogSyncResponseMsg& msg);
  bool is_redo_log_sync_finish() const;
  bool is_prepare_leader_revoke() const;
  int retry_redo_sync_task(
      const uint64_t log_id, const int64_t log_type, const int64_t timestamp, const bool first_gen);
  bool need_to_post_redo_sync_task(const int64_t log_type) const;
  bool is_redo_log_replayed(const uint64_t log_id);
  // used by dead lock detector, maybe incorrect
  void set_local_trans(bool is_local_trans)
  {
    is_local_trans_ = is_local_trans;
  }
  int calculate_trans_ctx_cost(uint64_t& cost) const;
  int remove_callback_for_uncommited_txn(memtable::ObMemtable* mt);
  int remove_mem_ctx_for_trans_ctx(memtable::ObMemtable* mt);
  int handle_savepoint_rollback_request(const int64_t sql_no, const int64_t cur_sql_no, const bool need_response);
  int half_stmt_commit();
  int set_trans_table_status_info(const ObTransTableStatusInfo& trans_table_status_info);
  int get_trans_mem_total_size(int64_t& size) const;

  // dirty trans need to keep transaction state in memory to help decide
  // transaction modify-set accessbility
  // this contains two cases:
  // 1. trasaction persistented before commit/abort
  // 2. a transaction cross freeze_log_id whose modify-set locate in more than one memtable
  int mark_frozen_data(const memtable::ObMemtable* const frozen_memtable,
      const memtable::ObMemtable* const active_memtable, int64_t& cb_cnt);
  int submit_log_for_split(bool& log_finished);
  bool mark_dirty_trans();
  bool is_dirty_trans() const
  {
    return is_dirty_;
  }
H
handora 已提交
315 316 317 318
  bool has_synced_log() const
  {
    return 0 != max_durable_log_ts_;
  }
O
oceanbase-admin 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
  int64_t get_forbidden_sql_no() const
  {
    return ATOMIC_LOAD(&forbidden_sql_no_);
  }
  int set_forbidden_sql_no(const int64_t sql_no, bool& forbid_succ);
  void update_max_durable_sql_no(const int32_t sql_no)
  {
    inc_update(&max_durable_sql_no_, sql_no);
  }
  int lock_for_read(
      const ObLockForReadArg& lock_for_read_arg, bool& can_read, int64_t& trans_version, bool& is_determined_state);
  int get_transaction_status_with_log_ts(const int64_t log_ts, ObTransTableStatusType& status, int64_t& trans_version);
  int is_running(bool& is_running);
  int check_sql_sequence_can_read(const int64_t sql_sequence, bool& can_read);
  int check_row_locked(const ObStoreRowkey& key, memtable::ObIMvccCtx& ctx, const transaction::ObTransID& read_trans_id,
      const transaction::ObTransID& data_trans_id, const int64_t sql_sequence,
      storage::ObStoreRowLockState& lock_state);
  int cleanout_transnode(memtable::ObMvccTransNode& tnode, memtable::ObMvccRow& value, bool& cleanout_finish);
  int replay_rollback_to(const int64_t sql_no, const int64_t log_ts);
  int check_log_ts_and_get_trans_version(
      const int64_t log_ts, int64_t& trans_version, bool& is_related_trans, bool& is_rollback_trans);
  int get_trans_table_status_info(ObTransTableStatusInfo& trans_table_status_info);
  int check_if_terminated_in_given_log_range(const int64_t start_log_ts, const int64_t end_log_ts, bool& is_terminated);
  int get_trans_sstable_durable_ctx_info(const int64_t log_ts, ObTransSSTableDurableCtxInfo& info);
  int update_and_get_trans_table_with_minor_freeze(const int64_t log_ts, uint64_t& checksum);

  int recover_from_trans_sstable_durable_ctx_info(ObTransSSTableDurableCtxInfo& ctx_info);
  int64_t get_applying_log_ts() const;
  int64_t get_pending_log_size()
  {
    return mt_ctx_.get_pending_log_size();
  }
  int64_t get_flushed_log_size()
  {
    return mt_ctx_.get_flushed_log_size();
  }
  int submit_log_if_neccessary();
  // get transaction state, commit version and undo information
  int get_trans_state_and_version_without_lock(ObTransStatusInfo& trans_info);
  int handle_2pc_request(const ObTrxMsgBase& msg, const int64_t msg_type);
  int handle_listener_message(const ObTrxMsgBase& msg, const int64_t msg_type);
  int construct_listener_context(const ObTrxMsgBase& msg);
  int get_trans_state(int64_t& state);
  void get_audit_info(int64_t& lock_for_read_elapse) const;
  bool has_write_or_replay_mutator_redo_log() const
  {
    return has_write_or_replay_mutator_redo_log_;
  }
  int64_t get_max_durable_log_ts() const
  {
    return max_durable_log_ts_;
  }
  bool is_logging() const
  {
    return is_logging_();
  }
  bool is_listener() const
  {
    return is_listener_;
  }
  bool is_enable_new_1pc() const
  {
    return enable_new_1pc_;
  }
H
handora 已提交
383 384 385 386
  bool is_task_match()
  {
    return stmt_info_.is_task_match();
  }
O
oceanbase-admin 已提交
387
  void remove_trans_table();
388 389
  int clear_trans_after_restore(const int64_t restore_version, const uint64_t last_restore_log_id,
      const int64_t last_restore_log_ts, const int64_t fake_terminate_log_ts);
O
oceanbase-admin 已提交
390 391 392
  bool is_in_trans_table_state();
  virtual int64_t get_part_trans_action() const override;
  int rollback_stmt(const int64_t from_sql_no, const int64_t to_sql_no);
393
  bool need_update_schema_version(const uint64_t log_id, const int64_t log_ts);
394
  int update_max_majority_log(const uint64_t log_id, const int64_t log_ts);
O
obdev 已提交
395
  void reset_prev_redo_log_ids();
O
oceanbase-admin 已提交
396

G
gm 已提交
397
public:
O
oceanbase-admin 已提交
398 399 400 401 402 403 404 405
  INHERIT_TO_STRING_KV("ObDistTransCtx", ObDistTransCtx, K_(snapshot_version), K_(local_trans_version),
      K_(submit_log_pending_count), K_(submit_log_count), K_(stmt_info), K_(global_trans_version), K_(redo_log_no),
      K_(mutator_log_no), K_(session_id), K_(is_gts_waiting), K_(part_trans_action), K_(timeout_task),
      K_(batch_commit_trans), K_(batch_commit_state), K_(is_dup_table_trans), K_(last_ask_scheduler_status_ts),
      K_(last_ask_scheduler_status_response_ts), K_(ctx_dependency_wrap), K_(preassigned_log_meta),
      K_(is_dup_table_prepare), K_(dup_table_syncing_log_id), K_(is_prepare_leader_revoke), K_(is_local_trans),
      K_(forbidden_sql_no), K(is_dirty_), K_(undo_status), K_(max_durable_sql_no), K_(max_durable_log_ts),
      K(mt_ctx_.get_checksum_log_ts()), K_(is_changing_leader), K_(has_trans_state_log),
H
handora 已提交
406 407
      K_(is_trans_state_sync_finished), K_(status), K_(same_leader_batch_partitions_count), K_(is_hazardous_ctx),
      K(mt_ctx_.get_callback_count()), K_(in_xa_prepare_state), K_(is_listener), K_(last_replayed_redo_log_id),
O
obdev 已提交
408
      K_(status), K_(is_xa_trans_prepared));
O
oceanbase-admin 已提交
409

G
gm 已提交
410
public:
O
oceanbase-admin 已提交
411 412
  static const int64_t OP_LOCAL_NUM = 16;

G
gm 已提交
413
private:
O
oceanbase-admin 已提交
414 415 416 417 418
  int init_memtable_ctx_(ObTransService* txs, const uint64_t tenant_id);
  bool is_in_2pc_() const;
  bool is_logging_() const;
  bool has_logged_() const;
  bool is_pre_preparing_() const;
O
obdev 已提交
419
  bool need_record_log() const;
O
oceanbase-admin 已提交
420 421 422 423 424 425 426 427
  int reserve_log_header_(char* buf, const int64_t size, int64_t& pos);
  int fill_sp_redo_log_(ObSpTransRedoLog& sp_redo_log, const int64_t available_capacity, int64_t& mutator_size);
  int fill_sp_commit_log_(const int real_log_type, char* buf, const int64_t size, int64_t& pos, int64_t& log_type,
      bool& has_redo_log, bool& need_submit_log, int64_t& mutator_size);
  int fill_sp_abort_log_(char* buf, const int64_t size, int64_t& pos, int64_t& log_type);
  int fill_redo_log_(char* buf, const int64_t size, int64_t& pos, int64_t& mutator_size);
  int fill_prepare_log_(char* buf, const int64_t size, int64_t& pos);
  int fill_commit_log_(char* buf, const int64_t size, int64_t& pos);
O
obdev 已提交
428
  int fill_record_log_(char *buf, const int64_t size, int64_t &pos);
O
oceanbase-admin 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  int fill_abort_log_(char* buf, const int64_t size, int64_t& pos);
  int fill_clear_log_(char* buf, const int64_t size, int64_t& pos);
  int fill_log_header_(char* buf, const int64_t size, int64_t& pos, const int64_t log_type, const int64_t idx);
  int fill_redo_prepare_log_(
      char* buf, const int64_t size, int64_t& pos, int64_t& redo_log_pos, int64_t& log_type, int64_t& mutator_size);
  int fill_redo_prepare_commit_log_(
      char* buf, const int64_t size, int64_t& pos, int64_t& redo_log_pos, int64_t& log_type, int64_t& mutator_size);
  int fill_redo_prepare_commit_clear_log_(
      char* buf, const int64_t size, int64_t& pos, int64_t& log_type, int64_t& mutator_size);

  int fill_trans_state_log_(char* buf, const int64_t size, int64_t& pos);
  int fill_mutator_log_(char* buf, const int64_t size, int64_t& pos, int64_t& mutator_size);
  int fill_mutator_state_log_(char* buf, const int64_t size, int64_t& pos, int64_t& log_type);
  int fill_mutator_abort_log_(char* buf, const int64_t size, int64_t& pos);
  int fill_pre_commit_log_(char* buf, const int64_t size, int64_t& pos);

  int submit_log_impl_(const int64_t log_type, const bool pending, const bool sync, bool& has_redo_log);
  int submit_log_async_(const int64_t log_type, bool& has_redo_log)
  {
    return submit_log_impl_(log_type, false, false, has_redo_log);
  }
  int submit_log_task_(const int64_t log_type, bool& has_redo_log)
  {
    return submit_log_impl_(log_type, true, false, has_redo_log);
  }
  int submit_log_sync_(const int64_t log_type, bool& has_redo_log);

G
gm 已提交
456
private:
O
oceanbase-admin 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
  int handle_start_stmt_request_(const ObTransMsg& msg);
  int handle_stmt_rollback_request_(const ObTransMsg& msg);
  int handle_2pc_prepare_request_(const ObTransMsg& msg);
  int handle_2pc_log_id_request_(const ObTransMsg& msg);
  int handle_2pc_commit_request_(const ObTransMsg& msg);
  int handle_2pc_abort_request_(const ObTransMsg& msg);
  int handle_2pc_clear_request_(const ObTransMsg& msg);
  int handle_2pc_commit_clear_request_(const int64_t commit_version, const bool need_response);
  int handle_2pc_pre_commit_request_(const int64_t commit_version, const bool need_response);
  int handle_trans_ask_scheduler_status_response_(const ObTransMsg& msg);
  int handle_savepoint_rollback_request_(const int64_t sql_no, const int64_t cur_sql_no, const bool need_response);
  int handle_2pc_commit_request_raw_(
      const int64_t msg_type, const int64_t trans_version, const PartitionLogInfoArray& partition_log_info_arr);
  int handle_2pc_abort_request_raw_(const int64_t msg_type, const int64_t msg_status);
  int handle_2pc_clear_request_raw_(const int64_t msg_type);
  int handle_2pc_prepare_request_raw_(int status);
  int handle_2pc_prepare_redo_request_raw_(int status);

G
gm 已提交
475
private:
O
oceanbase-admin 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
  int drive_();
  void check_prev_trans_state_();
  int start_trans_();
  int trans_end_(const bool commit, const int64_t commit_version);
  int trans_clear_();
  int trans_kill_();
  int start_stmt_(const int64_t sql_no);
  int end_stmt_(const bool is_rollback, const int64_t sql_no, const int64_t stmt_min_sql_no, bool& need_response);
  int generate_snapshot_(storage::ObIPartitionGroup* partition);
  int compensate_prepare_no_log_();
  int compensate_mutator_abort_log_();
  int clear_ctx_(const bool commit);
  int post_2pc_response_(const common::ObPartitionKey& pid, const int64_t msg_type);
  int update_global_trans_version_(const int64_t trans_version);
  int post_stmt_response_(
      const int64_t msg_type, const int64_t sql_no, const int status, const int64_t msg_timeout = INT64_MAX / 2);
  int post_response_(
      const common::ObPartitionKey& receiver, const int64_t msg_type, const int64_t sql_no, bool& partition_exist);
  int post_trans_response_(const int64_t msg_type);
  int handle_trans_clear_request_(const ObTransMsg& msg);
  int handle_trans_discard_request_(const ObTransMsg& msg);
  int do_prepare_(const int status);  // before write redo/prepare log
  int do_dist_commit_(const int64_t trans_version, const PartitionLogInfoArray* partition_log_info_arr);
  int do_abort_();
  int do_clear_();
  int on_prepare_redo_();  // after redo/prepare log written
  int on_prepare_(const bool batch_committed, const int64_t timestamp);
O
obdev 已提交
503
  int on_sp_commit_(const bool commit, const int64_t timestamp = OB_INVALID_TIMESTAMP);
O
oceanbase-admin 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
  int on_dist_commit_();
  int on_dist_abort_();
  int on_clear_(const bool need_response);
  int trans_sp_end_(const bool commit);
  void inc_submit_log_pending_count_();
  void dec_submit_log_pending_count_();
  void inc_submit_log_count_();
  void dec_submit_log_count_();
  int64_t get_global_trans_version_() const
  {
    return global_trans_version_;
  }
  void set_global_trans_version_(const int64_t trans_version)
  {
    global_trans_version_ = trans_version;
  }
  int alloc_local_trans_version_(const int64_t log_type);
  int prepare_sp_redolog_id_ts_(const int64_t log_type, bool& need_retry_alloc_id_ts);
  int retry_submit_log_(const int64_t log_type);
  int generate_local_trans_version_(const int64_t log_type);
  int generate_log_id_timestamp_(const int64_t log_type);
  int trans_replay_commit_(const int64_t commit_version, const int64_t checksum);
  int update_publish_version_(const int64_t publish_version, const bool for_replay);
  int calc_batch_commit_version_(int64_t& commit_version);
  int commit_by_checkpoint_(const int64_t commit_version);
  void update_last_checkpoint_(const int64_t checkpoint);
  int get_checkpoint_(int64_t& checkpoint);
  int checkpoint_(const int64_t checkpoint, const int64_t safe_slave_read_timestamp, bool& checkpoint_succ);
  bool in_changing_leader_windows_(const int64_t ts, common::ObTsWindows& changing_leader_windows)
  {
    return changing_leader_windows.contain(ts);
  }
  bool is_idential_tenant_();
  int prepare_changing_leader_(const common::ObAddr& proposal_leader);
  bool can_be_recycled_();
  bool need_to_ask_scheduler_status_();
  int check_rs_scheduler_is_alive_(bool& is_alive);
  int force_kill_();
  int post_ask_scheduler_status_msg_();
  // early lock release
  int drive_by_prev_trans_(int state);
  int check_and_early_release_lock_();
  int generate_sp_commit_log_type_(storage::ObStorageLogType& log_type);
  int pre_check_sp_trans_log_(const int log_type, ObSpTransCommitLog& commit_log, bool& need_wait_prev_trans);
  int register_trans_result_info_(const int state);
  int insert_all_prev_trans_(const bool for_replaying);
  bool bc_has_alloc_log_id_ts_()
  {
    return batch_commit_state_ >= ObBatchCommitState::ALLOC_LOG_ID_TS;
  }
  bool bc_has_generate_redo_log_()
  {
    return ObBatchCommitState::GENERATE_REDO_PREPARE_LOG == batch_commit_state_;
  }
  bool bc_has_generate_prepare_log_()
  {
    return batch_commit_state_ >= ObBatchCommitState::GENERATE_PREPARE_LOG;
  }
  int revert_self_();
  int get_compat_mode_(oceanbase::share::ObWorker::CompatMode& mode);
  int check_and_update_compact_mode_();
  bool in_pending_state_();
  bool has_write_data_();
  bool has_trans_version_();
  // duplicated table partition sync redo log
  int post_redo_log_sync_request_(
      const ObAddrLogIdArray& addr_logid_array, const uint64_t log_id, const int64_t log_ts, const int64_t log_type);
  int post_redo_log_sync_to_not_mask_addr_(const uint64_t log_id, const int64_t log_ts, const int64_t log_type);
  int post_redo_log_sync_response_(const uint64_t log_id);
  void update_max_submitted_log_timestamp_(const int64_t cur_log_timestamp);
  bool not_need_write_next_log_(const int64_t log_type);
  virtual void set_exiting_() override;
  virtual bool is_dirty() const override
  {
    return is_dirty_;
  }
  int submit_log_incrementally_(const bool need_state_log);
  int submit_log_incrementally_(const bool need_state_log, bool& has_redo_log);
  int submit_log_when_preparing_changing_leader_(const int64_t sql_no);
  int rollback_to_(const int32_t sql_no);
  int do_sp_trans_rollback_();
  int do_sp_trans_rollback_(const bool is_rollback, const int ret_code, sql::ObIEndTransCallback* cb);
  void debug_slow_on_sync_log_success_(const int64_t log_type);
  void DEBUG_SYNC_slow_txn_before_handle_message_(const int64_t msg_type);
  void DEBUG_SYNC_slow_txn_during_2pc_prepare_phase_for_physical_backup_1055_(const int64_t msg_type);
O
obdev 已提交
589 590
  int decide_log_type_for_mutator_and_record_(storage::ObStorageLogType& log_type);
  int decide_and_submit_next_log_(storage::ObStorageLogType& log_type, bool &has_redo_log, bool has_pending_cb);
O
oceanbase-admin 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
  int get_trans_table_status_info_(ObTransTableStatusInfo& trans_table_status_info);
  int get_trans_table_status_info_(const int64_t log_ts, ObTransTableStatusInfo& trans_table_status_info);
  int get_callback_type_(const int64_t sql_sequence, memtable::TransCallbackType& cb_type);
  bool can_submit_log_(const int64_t log_type);
  void update_durable_log_id_ts_(const int64_t log_type, const uint64_t log_id, const int64_t log_ts);
  int set_trans_table_status_info_(const ObTransTableStatusInfo& trans_table_status_info);
  int submit_big_trans_callback_task_(const int64_t log_type, const int64_t log_id, const int64_t timestamp);
  void update_clear_log_base_ts_(const int64_t log_ts);
  bool need_rollback_when_restore_(const int64_t commit_version);
  int64_t decide_sstable_trans_state_();
  bool is_trans_valid_for_replay_(const storage::ObStorageLogType log_type, const int64_t log_ts);
  int check_need_notify_listener_(bool& need_notify_listener);
  int init_listener_handler_();
  int try_respond_coordinator_(const ObTransMsgType msg_type, const ListenerAction action);
  int get_prepare_ack_arg_(int& status, int64_t& state, int64_t& prepare_version, uint64_t& prepare_log_id,
      int64_t& prepare_log_ts, int64_t& request_id, int64_t& remain_wait_interval_us, bool& is_xa_prepare);
Y
yy0 已提交
607
  int check_row_locked_(const ObTransStatusInfo& trans_info,
O
oceanbase-admin 已提交
608 609 610 611 612 613 614 615 616
      const ObTransID& data_trans_id, const int64_t sql_sequence, storage::ObStoreRowLockState& lock_state);
  int lock_for_read_(const ObTransStatusInfo& trans_info, const ObLockForReadArg& lock_for_read_arg, bool& can_read,
      int64_t& trans_version, bool& is_determined_state);
  int check_sql_sequence_can_read_(const ObTransStatusInfo& trans_info, const int64_t sql_sequence, bool& can_read);
  int set_tmp_scheduler_(const common::ObAddr& scheduler);
  void try_restore_read_snapshot();
  bool is_xa_last_empty_redo_log_() const;
  int fake_kill_(const int64_t terminate_log_ts);
  int kill_v2_(const int64_t terminate_log_ts);
G
gl0 已提交
617
  int calc_serialize_size_and_set_participants_(const ObPartitionArray &participants);
O
obdev 已提交
618
  int do_calc_and_set_participants_(const ObPartitionArray &participants);
G
gl0 已提交
619
  int calc_serialize_size_and_set_undo_(const int64_t undo_to, const int64_t undo_from);
O
oceanbase-admin 已提交
620

G
gm 已提交
621
private:
O
oceanbase-admin 已提交
622 623
  DISALLOW_COPY_AND_ASSIGN(ObPartTransCtx);

G
gm 已提交
624
private:
O
oceanbase-admin 已提交
625 626 627 628 629 630
  // 0x0078746374726170 means reset partctx
  static const int64_t PART_CTX_MAGIC_NUM = 0x0078746374726170;
  static const int64_t REPLAY_PRINT_TRACE_THRESHOLD = 10 * 1000;      // 10 ms
  static const int64_t REDO_SYNC_TASK_RETRY_INTERVAL_US = 10 * 1000;  // 10ms
  static const int64_t END_STMT_SLEEP_US = 10 * 1000;                 // 10ms
  static const int64_t MAX_END_STMT_RETRY_TIMES = 100;
O
obdev 已提交
631
  static const uint64_t MAX_PREV_LOG_IDS_COUNT = 80000;
G
gm 已提交
632
private:
O
oceanbase-admin 已提交
633 634
  bool is_inited_;
  ObIClogAdapter* clog_adapter_;
G
gl0 已提交
635
ObTransSubmitLogCb submit_log_cb_;
O
oceanbase-admin 已提交
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
  memtable::ObMemtableCtx mt_ctx_;
  memtable::ObIMemtableCtxFactory* mt_ctx_factory_;
  ObTransTaskWorker* big_trans_worker_;
  int64_t redo_log_no_;
  int64_t mutator_log_no_;
  ObRedoLogIdArray prev_redo_log_ids_;
  // partition to prepare id
  PartitionLogInfoArray partition_log_info_arr_;
  storage::ObPartitionService* partition_service_;
  ObTransStatusMgr* trans_status_mgr_;
  int64_t snapshot_version_;
  // 2pc participant's prepare version
  // this acquired from gts or gts cache
  // it was not the final prepare version usually
  int64_t local_trans_version_;
  // number of successfully executed task
  // if equals to zero, part_ctx can be released directly
  int64_t commit_task_count_;
  // in threadpoll submit log task count
  int64_t submit_log_pending_count_;
  // in progress persistent log task count
  int64_t submit_log_count_;
  uint64_t cluster_id_;
  int64_t prepare_log_id_;
  int64_t prepare_log_timestamp_;
  int64_t global_trans_version_;
  ObTransStmtInfo stmt_info_;
  int64_t leader_epoch_;
  clog::ObLogMeta preassigned_log_meta_;
  // minimum log_id this partition has written
  uint64_t min_log_id_;
  int64_t min_log_ts_;
  int64_t end_log_ts_;
  int64_t max_submitted_log_timestamp_;
  // current log type need to be write
  int64_t log_type_;
  int64_t gts_request_ts_;
  int64_t stmt_expired_time_;
  int64_t quick_checkpoint_;
  int64_t commit_log_checksum_;
  CHANGING_LEADER_STATE prepare_changing_leader_state_;

  common::ObAddr proposal_leader_;

  int64_t last_ask_scheduler_status_ts_;
  int64_t last_ask_scheduler_status_response_ts_;
  int64_t cur_query_start_time_;
  // forbiden this sql's task
  int64_t forbidden_sql_no_;
  int batch_commit_state_;
  // ELR transaction dep relation
  ObPartTransCtxDependencyWrap ctx_dependency_wrap_;
  common::ObMaskSet2<ObAddrLogId> dup_table_msg_mask_set_;
  ObAddrLogIdArray dup_table_lease_addrs_;
  ObDupTableRedoSyncTask* redo_sync_task_;
  bool is_dup_table_prepare_;
  uint64_t dup_table_syncing_log_id_;
  int64_t dup_table_syncing_log_ts_;
  int64_t async_applying_log_ts_;
  ObTransUndoStatus undo_status_;
  int32_t max_durable_sql_no_;
  // uint64_t max_durable_log_id_;
  int64_t max_durable_log_ts_;
  // persistented transaction which uncommitted
  bool is_dirty_;
  mutable TransTableSeqLock trans_table_seqlock_;
  int sp_user_request_;
  int32_t same_leader_batch_partitions_count_;
  bool need_checksum_;
  bool is_prepared_;
  bool is_gts_waiting_;
  bool batch_commit_trans_;
H
handora 已提交
708 709 710 711 712 713 714 715 716 717 718 719
  // Whether there exists a trans state log for the current leader transfer
  //
  // It is implemented as follow:
  // - For the New Leader:
  //   - we set the value to true when we replay the trans state log
  //     if the new leader is me
  //   - we reset the value when leader is active
  // - For the original Leader:
  //   - we reset the value before each leader transfer
  //   - we set the value to true when we synced the trans state log
  //   - we reset the value when leader is revoked if no on-the-fly log
  //     exist
O
oceanbase-admin 已提交
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
  bool is_trans_state_sync_finished_;
  bool is_changing_leader_;
  bool can_rollback_stmt_;
  // this participant has switch leader out and write transaction state log
  bool has_trans_state_log_;
  bool waiting_next_trans_rollback_;
  bool is_prepare_leader_revoke_;
  bool is_local_trans_;
  // ctx's state can't be recovery, must reboot server
  bool is_hazardous_ctx_;
  // xa prepare_state indicate xa commit has not been received
  // it is used to distingush the requirement of prepare trx_prepare_version
  bool in_xa_prepare_state_;
  // only used by XA transaction
  // used to distingush redo log has been persistented
  bool is_redo_prepared_;
  bool has_gen_last_redo_log_;
  // this let clear_log's timestamp always
D
dshe 已提交
738
  // greater than commit timestamp of all participants
O
oceanbase-admin 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
  int64_t clear_log_base_ts_;
  TransResultInfo result_info_;
  int64_t end_log_ts_for_batch_commit_;
  int64_t end_log_ts_for_elr_;
  bool is_listener_;
  ObTransListenerHandler* listener_handler_;
  storage::ObIPartitionGroup* pg_;
  bool enable_new_1pc_;
  ObTransCtx* coord_ctx_;
  uint64_t last_replayed_redo_log_id_;
  // used by XA response message to sender
  common::ObAddr tmp_scheduler_;
  int64_t last_redo_log_mutator_size_;
  bool is_xa_trans_prepared_;
  bool has_write_or_replay_mutator_redo_log_;
754
  bool is_in_redo_with_prepare_;
755 756 757
  int64_t redo_log_id_serialize_size_;
  int64_t participants_serialize_size_;
  int64_t undo_serialize_size_;
O
obdev 已提交
758 759
  // the log id of prev checkpoint log
  uint64_t prev_checkpoint_id_;
O
oceanbase-admin 已提交
760 761 762 763 764 765 766 767 768 769
};

#if defined(__x86_64__)
STATIC_ASSERT(sizeof(ObPartTransCtx) < 8000, "ObPartTransCtx is too big");
#endif

}  // namespace transaction
}  // namespace oceanbase

#endif  // OCEANBASE_TRANSACTION_OB_TRANS_PART_CTX_