ob_log_part_trans_task.h 51.2 KB
Newer Older
S
SanmuWangZJU 已提交
1 2 3 4 5 6 7 8 9 10
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
W
wangzelin.wzl 已提交
11 12
 *
 * Transaction Structures
S
SanmuWangZJU 已提交
13 14
 */

W
wangzelin.wzl 已提交
15 16
#ifndef OCEANBASE_LIBOBCDC_TRANS_TASK_H__
#define OCEANBASE_LIBOBCDC_TRANS_TASK_H__
S
SanmuWangZJU 已提交
17 18 19 20 21 22

#include "lib/queue/ob_link.h"                      // ObLink
#include "lib/atomic/ob_atomic.h"                   // ATOMIC_LOAD
#include "lib/lock/ob_small_spin_lock.h"            // ObByteLock
#include "common/object/ob_object.h"                // ObObj
#include "common/ob_queue_thread.h"                 // ObCond
W
wangzelin.wzl 已提交
23 24
#include "ob_cdc_tablet_to_table_info.h"            // ObCDCTabletChangeInfo
#include "storage/tx/ob_trans_define.h"             // ObTransID, ObLSLogInfoArray
25
#include "storage/tx/ob_tx_big_segment_buf.h"       // ObTxBigSegmentBuf
S
SanmuWangZJU 已提交
26
#include "storage/memtable/ob_memtable_mutator.h"   // ObMemtableMutatorRow, ObMemtableMutatorMeta
W
wangzelin.wzl 已提交
27
#include "storage/blocksstable/ob_datum_row.h"      // ObRowDml
O
obdev 已提交
28
#include "logservice/data_dictionary/ob_data_dict_storager.h"  // ObDataDictStorage
S
SanmuWangZJU 已提交
29 30

#include "ob_log_trans_log.h"                       // SortedRedoLogList
W
wangzelin.wzl 已提交
31 32
#include "ob_cdc_multi_data_source_info.h"          // MultiDataSourceNode, MultiDataSourceInfo
#include "ob_log_rollback_section.h"                // RollbackList
S
SanmuWangZJU 已提交
33
#include "ob_log_lighty_list.h"                     // LightyList
W
wangzelin.wzl 已提交
34
#include "ob_log_all_ddl_operation_schema_info.h"   // ObLogAllDdlOperationSchemaInfo
S
SanmuWangZJU 已提交
35 36 37 38
#include "ob_small_arena.h"                         // ObSmallArena
#include "ob_log_task_pool.h"                       // TransTaskBase
#include "ob_log_utils.h"                           // is_ddl_table
#include "ob_log_resource_recycle_task.h"           // ObLogResourceRecycleTask
W
wangzelin.wzl 已提交
39 40 41
#include "ob_log_callback.h"                        // ObILogCallback
#include "ob_cdc_lob_ctx.h"                         // ObLobDataOutRowCtxList
#include "ob_cdc_lob_aux_table_schema_info.h"       // ObCDCLobAuxTableSchemaInfo
42
#include "lib/allocator/ob_lf_fifo_allocator.h"     // ObConcurrentFIFOAllocator
O
obdev 已提交
43
#include "ob_log_safe_arena.h"
44
#include "ob_log_tic_update_info.h"                 // TICUpdateInfo
S
SanmuWangZJU 已提交
45 46 47 48 49 50 51 52 53 54 55 56

namespace oceanbase
{
namespace share
{
namespace schema
{
class ObTableSchema;
class ObColumnSchemaV2;
}
}

W
wangzelin.wzl 已提交
57
namespace libobcdc
S
SanmuWangZJU 已提交
58 59 60 61
{
class PartTransTask;
class ObLogBR;
class ObLogEntryTask;
O
obdev 已提交
62
class TableSchemaInfo;
O
obdev 已提交
63
class ObCDCUdtValueMap;
S
SanmuWangZJU 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

class IStmtTask : public ObLink   // Inheritance of ObLink is only used for Sequencer
{
public:
  enum StmtType
  {
    STMT_TYPE_UNKNOWN = 0,
    STMT_TYPE_DML = 1,              // DML statement
    STMT_TYPE_DDL = 2,              // DDL statement
  };

  IStmtTask(const StmtType type, PartTransTask &host) :
    type_(type),
    host_(host),
    hash_value_(common::OB_INVALID_ID),
    row_index_(OB_INVALID_ID),
W
wangzelin.wzl 已提交
80
    br_(NULL),
S
SanmuWangZJU 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    next_(NULL)
  {}

  virtual ~IStmtTask() { reset(); }

  bool is_unknown_stmt() const { return STMT_TYPE_UNKNOWN == type_; }
  bool is_dml_stmt() const { return STMT_TYPE_DML == type_; }
  bool is_ddl_stmt() const { return STMT_TYPE_DDL == type_; }

  PartTransTask &get_host() { return host_; }
  const PartTransTask &get_host() const { return host_; }
  uint64_t get_tenant_id() const;

  StmtType get_type() const { return type_; }
  void set_type(const int type) { type_ = static_cast<StmtType>(type); }

  void reset();

  IStmtTask *get_next() { return next_; }
  void set_next(IStmtTask *next) { next_ = next; }

  uint64_t hash() const { return hash_value_; }
  void set_hash_value(const int64_t hash_value) { hash_value_ = hash_value; }

  uint64_t get_row_index() const { return row_index_; }
  void set_row_index(const uint64_t row_index) { row_index_ = row_index; }

W
wangzelin.wzl 已提交
108 109 110 111 112
  ObLogBR *get_binlog_record() { return br_; }
  const ObLogBR *get_binlog_record() const { return br_; }
  void set_binlog_record(ObLogBR *br) { br_ = br; }

  static const char *print_stmt_type(const int type)
S
SanmuWangZJU 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  {
    const char *type_str = "UNKNOWN";
    switch (type) {
    case STMT_TYPE_UNKNOWN:
      type_str = "UNKNOWN";
      break;
    case STMT_TYPE_DML:
      type_str = "DML";
      break;
    case STMT_TYPE_DDL:
      type_str = "DDL";
      break;
    default:
      type_str = "INVALID";
      break;
    }
    return type_str;
  }

W
wangzelin.wzl 已提交
132 133
  TO_STRING_KV(
      K_(type),
S
SanmuWangZJU 已提交
134 135 136
      "type_str", print_stmt_type(type_),
      K_(hash_value),
      K_(row_index),
W
wangzelin.wzl 已提交
137
      KP_(br),
S
SanmuWangZJU 已提交
138 139 140 141
      K_(host),
      KP_(next));

protected:
W
wangzelin.wzl 已提交
142 143 144 145 146 147
  StmtType           type_;
  PartTransTask      &host_;        // part trans task the stmt belongs to
  uint64_t           hash_value_;   // HASH value
  uint64_t           row_index_;    // row index for the stmt in the trans it belongs
  ObLogBR            *br_;
  IStmtTask          *next_;
S
SanmuWangZJU 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161

private:
  DISALLOW_COPY_AND_ASSIGN(IStmtTask);
};

////////////////////////////////////////////////////////////////////////////////////

// node of column value
struct ColValue
{
  common::ObObj value_;
  uint64_t      column_id_;
  ObString      string_value_;    // The value after converting Obj to a string
  ColValue      *next_;
162 163 164 165 166 167 168 169
  union {
    uint8_t column_flags_;
    struct {
      uint8_t   is_out_row_       : 1;  // Column data is stored out row
      uint8_t   is_col_nop_       : 1;  // Column data is nop
      uint8_t   reserve_fields_   : 6;  // reserve fileds
    };
  };
S
SanmuWangZJU 已提交
170

O
obdev 已提交
171 172 173 174
  // if this ColValue is group value
  // then children_ store group hidden ColValue
  LightyList<ColValue> children_;

S
SanmuWangZJU 已提交
175 176 177 178 179 180
  void reset()
  {
    value_.reset();
    column_id_ = common::OB_INVALID_ID;
    string_value_.reset();
    next_ = NULL;
181
    column_flags_ = 0;
O
obdev 已提交
182
    children_.reset();
S
SanmuWangZJU 已提交
183 184 185 186 187 188 189 190 191 192
  }

  bool is_valid()
  {
    return value_.is_valid_type() && common::OB_INVALID_ID != column_id_;
  }

  ColValue *get_next() { return next_; }
  void set_next(ColValue *next) { next_ = next; }

O
obdev 已提交
193 194 195 196
  bool is_json() const { return value_.is_json(); }
  bool is_geometry() const { return value_.is_geometry(); }
  common::ObObjType get_obj_type() const { return value_.get_type(); }

O
obdev 已提交
197 198
  int add_child(ColValue *child) {return children_.add(child);}

W
wangzelin.wzl 已提交
199
  TO_STRING_KV(
O
obdev 已提交
200
      "type", common::ob_obj_type_str(get_obj_type()),
W
wangzelin.wzl 已提交
201 202 203
      K_(value),
      K_(column_id),
      K_(string_value),
204 205
      K_(is_out_row),
      K_(is_col_nop));
S
SanmuWangZJU 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
};

///////////////////////////////////////////////////////////////////////////////////

typedef LightyList<ColValue> ColValueList;
class ObObj2strHelper;

// row value
class MutatorRow : public memtable::ObMemtableMutatorRow
{
public:
  explicit MutatorRow(common::ObIAllocator &allocator);
  virtual ~MutatorRow();

public:
O
obdev 已提交
221 222
  common::ObIAllocator &get_allocator() { return allocator_; }

S
SanmuWangZJU 已提交
223
  // Deserialize a row
W
wangzelin.wzl 已提交
224
  virtual int deserialize(const char *buf, const int64_t data_len, int64_t &pos);
S
SanmuWangZJU 已提交
225 226 227

  // Support for filtering table data within PG
  // Deserialize some fields: first step to get row_size, table_id
W
wangzelin.wzl 已提交
228 229
  int deserialize_first(
      const char *buf,
S
SanmuWangZJU 已提交
230 231
      const int64_t data_len,
      int64_t &pos,
W
wangzelin.wzl 已提交
232
      int32_t &row_size);
S
SanmuWangZJU 已提交
233 234

  // Deserialize some fields: Step 2 continues the parsing to get the table_version
W
wangzelin.wzl 已提交
235 236
  int deserialize_second(
      const char *buf,
S
SanmuWangZJU 已提交
237 238 239 240 241 242 243 244
      const int64_t data_len,
      int64_t &pos,
      int64_t &table_version);

  void reset();

  // Parse the column data
  // If obj2str_helper is empty, do not convert obj to string
W
wangzelin.wzl 已提交
245 246
  int parse_cols(
      ObObj2strHelper *obj2str_helper = NULL,
O
obdev 已提交
247 248
      const uint64_t tenant_id = OB_INVALID_TENANT_ID,
      const uint64_t table_id = OB_INVALID_ID,
S
SanmuWangZJU 已提交
249
      const TableSchemaInfo *tb_schema_info = NULL,
O
obdev 已提交
250
      const ObTimeZoneInfoWrap *tz_info_wrap = nullptr,
W
wangzelin.wzl 已提交
251 252
      const bool enable_output_hidden_primary_key = false,
      const ObLogAllDdlOperationSchemaInfo *all_ddl_operation_table_schema_info = NULL);
S
SanmuWangZJU 已提交
253

W
wangzelin.wzl 已提交
254
  // Parse the column data based on ObTableSchema
O
obdev 已提交
255
  template<class CDC_INNER_TABLE_SCHEMA>
W
wangzelin.wzl 已提交
256
  int parse_cols(
O
obdev 已提交
257
      const CDC_INNER_TABLE_SCHEMA &lob_aux_table_schema_info);
S
SanmuWangZJU 已提交
258

W
wangzelin.wzl 已提交
259 260 261 262 263 264 265
  int get_cols(
      ColValueList **rowkey_cols,
      ColValueList **new_cols,
      ColValueList **old_cols,
      ObLobDataOutRowCtxList **new_lob_ctx_cols);

  ObLobDataOutRowCtxList &get_new_lob_ctx_cols() { return new_lob_ctx_cols_; }
S
SanmuWangZJU 已提交
266

267 268
  int parse_ext_info_log(ObString &ext_info_log);

S
SanmuWangZJU 已提交
269
public:
W
wangzelin.wzl 已提交
270 271
  TO_STRING_KV(
      "Row", static_cast<const memtable::ObMemtableMutatorRow &>(*this),
S
SanmuWangZJU 已提交
272 273 274 275 276 277 278
      K_(deserialized),
      K_(cols_parsed),
      K_(new_cols),
      K_(old_cols),
      K_(rowkey_cols));

private:
W
wangzelin.wzl 已提交
279 280
  int parse_columns_(
      const bool is_parse_new_col,
S
SanmuWangZJU 已提交
281 282 283
      const char *col_data,
      const int64_t col_data_size,
      ObObj2strHelper *obj2str_helper,
O
obdev 已提交
284 285
      const uint64_t tenant_id,
      const uint64_t table_id,
S
SanmuWangZJU 已提交
286
      const TableSchemaInfo *tb_schema_info,
O
obdev 已提交
287
      const ObTimeZoneInfoWrap *tz_info_wrap,
W
wangzelin.wzl 已提交
288 289 290 291 292
      const bool enable_output_hidden_primary_key,
      const ObLogAllDdlOperationSchemaInfo *all_ddl_operation_table_schema_info,
      ColValueList &cols);
  int parse_rowkey_(
      ColValueList &rowkey_cols,
S
SanmuWangZJU 已提交
293 294
      const common::ObStoreRowkey &rowkey,
      ObObj2strHelper *obj2str_helper,
O
obdev 已提交
295 296
      const uint64_t tenant_id,
      const uint64_t table_id,
S
SanmuWangZJU 已提交
297
      const TableSchemaInfo *tb_schema_info,
O
obdev 已提交
298
      const ObTimeZoneInfoWrap *tz_info_wrap,
S
SanmuWangZJU 已提交
299
      const bool enable_output_hidden_primary_key);
300
  int deep_copy_encoded_column_value_(blocksstable::ObStorageDatum &datum);
W
wangzelin.wzl 已提交
301 302 303 304 305 306 307 308 309 310
  // 1. get column_id and column_schema_info for user table;
  // 2. get column_id for all_ddl_operation_table
  int get_column_info_(
      const TableSchemaInfo *tb_schema_info,
      const ObLogAllDdlOperationSchemaInfo *all_ddl_operation_table_schema_info,
      const int64_t column_idx,
      const bool is_rowkey_column_idx,
      uint64_t &column_id,
      ColumnSchemaInfo *&column_schema_info) const;
  int add_column_(
S
SanmuWangZJU 已提交
311 312
      const uint64_t column_id,
      const ObObj *value,
W
wangzelin.wzl 已提交
313
      const bool is_out_row,
O
obdev 已提交
314 315
      const uint64_t tenant_id,
      const uint64_t table_id,
W
wangzelin.wzl 已提交
316 317
      const ColumnSchemaInfo *column_schema,
      const ObObj2strHelper *obj2str_helper,
O
obdev 已提交
318
      const ObTimeZoneInfoWrap *tz_info_wrap,
O
obdev 已提交
319 320
      ColValueList &cols,
      ObCDCUdtValueMap *udt_value_map);
W
wangzelin.wzl 已提交
321 322 323 324 325 326 327 328
  int set_obj_propertie_(
      const uint64_t column_id,
      const int64_t column_idx_for_datum_row,
      const ColumnSchemaInfo *column_schema_info,
      const ObLogAllDdlOperationSchemaInfo *all_ddl_operation_table_schema_info,
      ObObjMeta &obj_meta,
      ObObj &obj);

O
obdev 已提交
329 330
  // InnerTable parse
  template<class CDC_INNER_TABLE_SCHEMA>
W
wangzelin.wzl 已提交
331 332 333 334
  int parse_columns_(
      const bool is_parse_new_col,
      const char *col_data,
      const int64_t col_data_size,
O
obdev 已提交
335
      const CDC_INNER_TABLE_SCHEMA &inner_table_schema,
W
wangzelin.wzl 已提交
336
      ColValueList &cols);
O
obdev 已提交
337
  template<class TABLE_SCHEMA>
W
wangzelin.wzl 已提交
338
  int parse_rowkey_(
O
obdev 已提交
339
      const TABLE_SCHEMA &table_schema,
W
wangzelin.wzl 已提交
340 341 342 343 344 345
      const common::ObStoreRowkey &rowkey,
      ColValueList &rowkey_cols);
  int add_column_(
      ColValueList &cols,
      const uint64_t column_id,
      const ObObj *value);
O
obdev 已提交
346
  template<class TABLE_SCHEMA>
W
wangzelin.wzl 已提交
347 348 349
  int set_obj_propertie_(
      const uint64_t column_id,
      const int64_t column_idx_for_datum_row,
O
obdev 已提交
350
      const TABLE_SCHEMA &table_schema,
W
wangzelin.wzl 已提交
351 352
      ObObjMeta &obj_meta,
      ObObj &obj);
S
SanmuWangZJU 已提交
353 354 355 356 357 358 359 360 361 362

private:
  common::ObIAllocator  &allocator_;

  bool                  deserialized_;
  bool                  cols_parsed_;
  ColValueList          new_cols_;     // A list of new values for the columns, currently no primary key values are stored, only normal columns
  ColValueList          old_cols_;     // A list of old values for the columns, currently no primary key values are stored, only normal columns
  ColValueList          rowkey_cols_;  // rowkey column

W
wangzelin.wzl 已提交
363 364
  ObLobDataOutRowCtxList new_lob_ctx_cols_;

S
SanmuWangZJU 已提交
365 366 367 368 369
private:
  DISALLOW_COPY_AND_ASSIGN(MutatorRow);
};

///////////////////////////////////////////////////////////////////////////////////
W
wangzelin.wzl 已提交
370
#define DELIMITER_STR "_"
S
SanmuWangZJU 已提交
371

W
wangzelin.wzl 已提交
372 373 374 375
// The DML unique ID is part_trans_id(tenant_id + ls_id + trans_id) + redo_log_lsn  + row_index, the separator is `_`
// tenant_id+ls_id+trans_id+redo_log_lsn could locate an unique log_entry for a trans,
// an redo_log_lsn locate a unique redo_log in a trans,
// row_index locate a specifed row recorded in a redo_log
S
SanmuWangZJU 已提交
376 377 378
class DmlStmtUniqueID
{
public:
W
wangzelin.wzl 已提交
379 380 381 382 383 384 385 386
  DmlStmtUniqueID(
      const ObString &part_trans_info_str,
      const palf::LSN &redo_log_lsn,
      const uint64_t row_index) :
    part_trans_info_str_(part_trans_info_str),
    redo_log_lsn_(redo_log_lsn),
    row_index_(row_index) {}

S
SanmuWangZJU 已提交
387 388 389 390
  ~DmlStmtUniqueID() { reset(); }
public:
  void reset()
  {
W
wangzelin.wzl 已提交
391 392
    // part_trans_info_str_.reset(); part_trans_info_str_ is a reference, should not reset it.
    redo_log_lsn_.reset();
S
SanmuWangZJU 已提交
393 394 395 396
    row_index_ = OB_INVALID_ID;
  }

  bool is_valid() const
W
wangzelin.wzl 已提交
397 398
  { return !part_trans_info_str_.empty() && redo_log_lsn_.is_valid() && OB_INVALID_ID != row_index_; }
  const palf::LSN &get_redo_lsn() const { return redo_log_lsn_; }
S
SanmuWangZJU 已提交
399
  uint64_t get_row_index() const { return row_index_; }
W
wangzelin.wzl 已提交
400
  // get length of serialized DmlStmtUniqueID(with '\0')
S
SanmuWangZJU 已提交
401 402 403 404 405
  int64_t get_dml_unique_id_length() const;

public:
  // row_index(uint64_t): to_cstring长度20
  static const int64_t MAX_ROW_INDEX_LENGTH = 20;
W
wangzelin.wzl 已提交
406
  template<typename NUM> static int64_t compute_str_length_base_num(const NUM num_to_compute);
S
SanmuWangZJU 已提交
407
  // Optimising customisation to_string
W
wangzelin.wzl 已提交
408 409
  int customized_to_string(char *buf, const int64_t buf_len, int64_t &pos) const;
  TO_STRING_KV(K_(part_trans_info_str), K_(redo_log_lsn), K_(row_index));
S
SanmuWangZJU 已提交
410 411

private:
W
wangzelin.wzl 已提交
412 413
  const ObString &part_trans_info_str_; // str of tenant_id + ls_id + trans_id
  palf::LSN redo_log_lsn_; // redo_log_lsn
S
SanmuWangZJU 已提交
414 415 416 417 418 419 420 421 422 423 424
  uint64_t row_index_;

private:
  DISALLOW_COPY_AND_ASSIGN(DmlStmtUniqueID);
};

// DML statement task
class DmlStmtTask : public IStmtTask
{
public:
  DmlStmtTask(PartTransTask &host,
W
wangzelin.wzl 已提交
425
      ObLogEntryTask &log_entry_task,
S
SanmuWangZJU 已提交
426 427 428 429 430
      MutatorRow &row);
  virtual ~DmlStmtTask();

  void reset();

W
wangzelin.wzl 已提交
431
  int64_t get_global_schema_version() const;
S
SanmuWangZJU 已提交
432
  int64_t get_table_version() const { return row_.table_version_; }
W
wangzelin.wzl 已提交
433 434
  uint64_t get_table_id() const { return table_id_; }
  void set_table_id(const uint64_t table_id) { table_id_ = table_id; }
O
obdev 已提交
435
  const logservice::TenantLSID &get_tls_id() const;
S
SanmuWangZJU 已提交
436
  const common::ObStoreRowkey &get_rowkey() const { return row_.rowkey_; }
W
wangzelin.wzl 已提交
437 438 439 440
  blocksstable::ObDmlFlag get_dml_flag() const { return row_.dml_flag_; }
  bool is_insert() const { return blocksstable::ObDmlFlag::DF_INSERT == row_.dml_flag_; }
  bool is_update() const { return blocksstable::ObDmlFlag::DF_UPDATE == row_.dml_flag_; }
  bool is_delete() const { return blocksstable::ObDmlFlag::DF_DELETE == row_.dml_flag_; }
S
SanmuWangZJU 已提交
441 442 443 444

  // Parse the column data
  // If obj2str_helper is empty, then no conversion of obj to string
  // NOTE: you can get_cols() only if you succeed in parse_cols()
W
wangzelin.wzl 已提交
445 446
  int parse_cols(
      ObObj2strHelper *obj2str_helper = NULL,
S
SanmuWangZJU 已提交
447
      const TableSchemaInfo *tb_schema_info = NULL,
O
obdev 已提交
448
      const ObTimeZoneInfoWrap *tz_info_wrap = nullptr,
O
obdev 已提交
449
      const bool enable_output_hidden_primary_key = false);
S
SanmuWangZJU 已提交
450

C
chaser-ch 已提交
451
  int parse_aux_meta_table_cols(const ObCDCLobAuxTableSchemaInfo &lob_aux_table_schema_info);
W
wangzelin.wzl 已提交
452

O
obdev 已提交
453 454 455 456 457 458 459 460 461 462 463
  // For the JSON or GIS(outrow storage)
  // The JSON/GIS data column size is over 4K and is outrow storage, reusing the basic capabilities of LOB.
  // So we need to call the obj2str API to get the final message format when the complete data is retrieved.
  int parse_col(
      const uint64_t tenant_id,
      const uint64_t column_id,
      const ColumnSchemaInfo &column_schema_info,
      const ObTimeZoneInfoWrap *tz_info_wrap,
      ObObj2strHelper &obj2str_helper,
      ColValue &cv_node);

W
wangzelin.wzl 已提交
464 465 466 467 468
  int get_cols(
      ColValueList **rowkey_cols,
      ColValueList **new_cols,
      ColValueList **old_cols,
      ObLobDataOutRowCtxList **new_lob_ctx_cols)
S
SanmuWangZJU 已提交
469
  {
W
wangzelin.wzl 已提交
470
    return row_.get_cols(rowkey_cols, new_cols, old_cols, new_lob_ctx_cols);
S
SanmuWangZJU 已提交
471 472
  }

W
wangzelin.wzl 已提交
473 474 475 476
  ObLobDataOutRowCtxList &get_new_lob_ctx_cols() { return row_.get_new_lob_ctx_cols(); }

  ObLogEntryTask &get_redo_log_entry_task() { return log_entry_task_; }

C
chinaxing 已提交
477
  const transaction::ObTxSEQ get_row_seq_no() const { return row_.seq_no_; }
S
SanmuWangZJU 已提交
478

W
wangzelin.wzl 已提交
479 480
  bool is_callback() const { return 1 == is_callback_; }
  void mark_callback() { is_callback_ = 1; }
S
SanmuWangZJU 已提交
481 482 483

public:
  TO_STRING_KV("IStmtTask", static_cast<const IStmtTask &>(*this),
W
wangzelin.wzl 已提交
484 485
      "is_cb", is_callback_,
      K_(table_id),
S
SanmuWangZJU 已提交
486
      K_(row),
W
wangzelin.wzl 已提交
487
      K_(log_entry_task));
S
SanmuWangZJU 已提交
488 489

private:
W
wangzelin.wzl 已提交
490 491 492
  uint8_t is_callback_ : 1;
  ObLogEntryTask          &log_entry_task_;
  uint64_t                table_id_;
S
SanmuWangZJU 已提交
493 494 495 496 497 498 499
  MutatorRow              &row_;
private:
  DISALLOW_COPY_AND_ASSIGN(DmlStmtTask);
};

///////////////////////////////////////////////////////////////////////////////

O
obdev 已提交
500
// DDL unique ID using tenant_id + schema_version
S
SanmuWangZJU 已提交
501 502 503
class DdlStmtUniqueID
{
public:
O
obdev 已提交
504 505
  DdlStmtUniqueID(const uint64_t tenant_id, const uint64_t schema_version) :
    tenant_id_(tenant_id), schema_version_(schema_version) {}
S
SanmuWangZJU 已提交
506 507 508 509
  ~DdlStmtUniqueID() { reset(); }
public:
  void reset()
  {
W
wangzelin.wzl 已提交
510
    tenant_id_ = OB_INVALID_TENANT_ID;
S
SanmuWangZJU 已提交
511 512 513 514
    schema_version_ = OB_INVALID_TIMESTAMP;
  }

  bool is_valid() const
O
obdev 已提交
515
  { return OB_INVALID_TENANT_ID != tenant_id_
S
SanmuWangZJU 已提交
516 517 518
    && OB_INVALID_TIMESTAMP != schema_version_; }

  uint64_t get_schema_version() const { return schema_version_; }
W
wangzelin.wzl 已提交
519
  uint64_t get_tenant_id() const { return tenant_id_; }
S
SanmuWangZJU 已提交
520 521

public:
W
wangzelin.wzl 已提交
522
  int64_t to_string(char *buf, const int64_t buf_len) const;
S
SanmuWangZJU 已提交
523 524

private:
W
wangzelin.wzl 已提交
525 526
  uint64_t    tenant_id_;        // TenantId
  uint64_t    schema_version_;   // schema version
S
SanmuWangZJU 已提交
527 528 529 530 531 532 533 534

private:
  DISALLOW_COPY_AND_ASSIGN(DdlStmtUniqueID);
};

class DdlStmtTask : public IStmtTask
{
public:
O
obdev 已提交
535
  DdlStmtTask(PartTransTask &host, MutatorRow &row);
S
SanmuWangZJU 已提交
536 537 538 539 540 541 542
  virtual ~DdlStmtTask();

public:
  void reset();

  // parse DDL data
  // init DDL Binlog Record
W
wangzelin.wzl 已提交
543 544
  int parse_ddl_info(
      ObLogBR *br,
S
SanmuWangZJU 已提交
545
      const uint64_t row_index,
W
wangzelin.wzl 已提交
546
      const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema_info,
547
      const bool is_build_baseline,
S
SanmuWangZJU 已提交
548 549
      bool &is_valid_ddl,
      int64_t &update_schema_version,
W
wangzelin.wzl 已提交
550 551
      uint64_t &exec_tennat_id,
      volatile bool &stop_flag);
S
SanmuWangZJU 已提交
552 553 554 555 556 557 558 559 560 561

  // get ddl str
  const ObString &get_ddl_stmt_str() const { return ddl_stmt_str_; }
  int64_t get_operation_type() const { return ddl_operation_type_; }
  uint64_t get_op_table_id() const { return ddl_op_table_id_; }
  uint64_t get_op_tenant_id() const { return ddl_op_tenant_id_; }
  uint64_t get_op_database_id() const { return ddl_op_database_id_; }
  uint64_t get_op_tablegroup_id() const { return ddl_op_tablegroup_id_; }
  int64_t get_op_schema_version() const { return ddl_op_schema_version_; }
  uint64_t get_exec_tenant_id() const { return ddl_exec_tenant_id_; }
C
chinaxing 已提交
562
  const transaction::ObTxSEQ &get_row_seq_no() const { return row_.seq_no_; }
S
SanmuWangZJU 已提交
563 564 565 566 567 568 569 570 571 572

public:
  // tennat_id(UINT64_MAX: 20) + schema_version(INT64_MAX:19)
  static const int64_t MAX_DDL_UNIQUE_ID_LENGTH = 50;
  // schema_version(INT64_MAX:19)
  static const int64_t MAX_DDL_SCHEMA_VERSION_STR_LENGTH = 20;
  // log id (INT64_MAX:19)
  static const int64_t MAX_PREPRAR_LOG_ID_LENGTH = 20;

public:
W
wangzelin.wzl 已提交
573 574
  TO_STRING_KV(
      "IStmtTask", static_cast<const IStmtTask &>(*this),
S
SanmuWangZJU 已提交
575 576 577 578 579 580 581 582
      K_(row),
      K_(ddl_exec_tenant_id),
      K_(ddl_stmt_str),
      K_(ddl_op_schema_version),
      K_(ddl_operation_type),
      K_(ddl_op_table_id),
      K_(ddl_op_tenant_id),
      K_(ddl_op_database_id),
O
obdev 已提交
583
      K_(ddl_op_tablegroup_id));
S
SanmuWangZJU 已提交
584 585

private:
W
wangzelin.wzl 已提交
586
  int parse_ddl_info_(
587
      const bool is_build_baseline,
W
wangzelin.wzl 已提交
588 589 590
      bool &contain_ddl_stmt,
      int64_t &update_schema_version,
      volatile bool &stop_flag);
S
SanmuWangZJU 已提交
591
  int parse_schema_version_(ObObj &col_value, int64_t &schema_version);
W
wangzelin.wzl 已提交
592
  int parse_ddl_info_from_normal_columns_(
593
      const bool is_build_baseline,
W
wangzelin.wzl 已提交
594 595
      ColValueList &col_value_list,
      ObLobDataOutRowCtxList &new_lob_ctx_cols);
S
SanmuWangZJU 已提交
596 597 598 599 600 601 602 603 604 605 606
  // 1. schema non-split mode returns the pure_id itself
  // 2. schema split mode returns the calculated result if the pure_id is valid; otherwise returns the pure_id itself
  // When in schema split mode, the common tenant table_id, database_id, user_id and tablegroup_id are
  // removed from the tenant information and need to be recalculated to ensure the schema is refreshed correctly
  uint64_t combine_id_(const bool is_schema_split_mode,
      const uint64_t tenant_id,
      const uint64_t pure_id);
  int build_ddl_binlog_record_(ObLogBR *br,
      const ObString &ddl_stmt,
      const uint64_t row_index);
  bool is_recyclebin_database_id(const uint64_t tenant_id, const uint64_t database_id);
W
wangzelin.wzl 已提交
607 608
  // Offline DDL: create hidden table
  bool is_create_table_ddl_(const int64_t ddl_operation_type);
S
SanmuWangZJU 已提交
609 610 611 612 613 614 615 616 617 618 619
  bool is_drop_table_ddl_(const int64_t ddl_operation_type);
  bool is_drop_tablegroup_ddl_(const int64_t ddl_operation_type);
  bool is_drop_tenant_ddl_(const int64_t ddl_operation_type);
  bool is_global_index_ddl_(const int64_t ddl_operation_type);
  // OB_DDL_CREATE_INDEX
  // OB_DDL_DROP_INDEX
  bool is_normal_index_ddl_(const int64_t ddl_operation_type);
  bool is_create_tenant_end_ddl_(const int64_t ddl_operation_type);
  bool is_finish_schema_split_ddl_(const int64_t ddl_operation_type);
  // OB_DDL_ADD_SUB_PARTITION
  // OB_DDL_DROP_SUB_PARTITION
W
wangzelin.wzl 已提交
620
  bool is_sub_tls_id_alter_ddl_(const int64_t ddl_operation_type);
S
SanmuWangZJU 已提交
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
  int init_ddl_unique_id_(common::ObString &ddl_unique_id);

private:
  MutatorRow  &row_;
  ObString    ddl_stmt_str_;
  int64_t     ddl_operation_type_;
  int64_t     ddl_op_schema_version_;
  char        ddl_op_schema_version_str_[MAX_DDL_SCHEMA_VERSION_STR_LENGTH];

  uint64_t    ddl_op_table_id_;
  uint64_t    ddl_op_tenant_id_;
  uint64_t    ddl_op_database_id_;
  uint64_t    ddl_op_tablegroup_id_;

  // Record Executor Tenant ID
  uint64_t    ddl_exec_tenant_id_;

private:
  DISALLOW_COPY_AND_ASSIGN(DdlStmtTask);
};

/////////////////////////////////////////////////////////////////////////////////

typedef LightyList<IStmtTask> StmtList;

class ObLogEntryTask
{
public:
649
  ObLogEntryTask(PartTransTask &host);
S
SanmuWangZJU 已提交
650 651 652 653 654
  virtual ~ObLogEntryTask();
  void reset();
  bool is_valid() const;

public:
W
wangzelin.wzl 已提交
655
  int init(
O
obdev 已提交
656
      const logservice::TenantLSID &tls_id,
W
wangzelin.wzl 已提交
657
      const char *participant,
S
SanmuWangZJU 已提交
658
      const transaction::ObTransID &trans_id,
W
wangzelin.wzl 已提交
659
      DmlRedoLogNode *redo_node);
S
SanmuWangZJU 已提交
660 661 662

  uint64_t hash() const
  {
W
wangzelin.wzl 已提交
663
    uint64_t hash_value = get_tls_id().hash();
S
SanmuWangZJU 已提交
664 665 666 667 668

    return hash_value;
  }

public:
W
wangzelin.wzl 已提交
669 670 671
  int get_status(bool &is_stored);
  int get_storage_key(std::string &key);

S
SanmuWangZJU 已提交
672 673 674
  inline void *get_host() { return host_; }
  inline void set_host(void *host) { host_ = host; }

O
obdev 已提交
675
  const logservice::TenantLSID &get_tls_id() const { return tls_id_; }
W
wangzelin.wzl 已提交
676 677 678
  uint64_t get_tenant_id() const { return tls_id_.get_tenant_id(); }
  const share::ObLSID &get_ls_id() const { return tls_id_.get_ls_id(); }
  bool is_sys_ls_task() const { return tls_id_.is_sys_log_stream(); }
S
SanmuWangZJU 已提交
679 680 681 682 683 684 685 686 687

  const transaction::ObTransID &get_trans_id() const { return trans_id_; }

  int get_valid_row_num(int64_t &valid_row_num);

  common::ObIAllocator &get_allocator() { return arena_allocator_; }
  void *alloc(const int64_t size);
  void free(void *ptr);

W
wangzelin.wzl 已提交
688 689 690 691 692 693
  const DmlRedoLogNode *get_redo_log_node() const { return redo_node_; }
  int get_log_lsn(palf::LSN &log_lsn); // get redo log lsn: may not unique cause multi redo in one log_entry
  int get_data_len(int64_t &data_len);

  int set_data_and_readed_status(bool is_big_row, char *data, int64_t data_len);
  int rc_callback();
S
SanmuWangZJU 已提交
694 695 696 697 698 699 700 701 702

  const StmtList &get_stmt_list() const { return stmt_list_; }
  StmtList &get_stmt_list() { return stmt_list_; }
  int64_t get_stmt_num() const { return stmt_list_.num_; }
  int add_stmt(const uint64_t row_index, IStmtTask *stmt_task);

  // Increases the number of statements that complete the formatting and returns the result after the increase
  int64_t inc_formatted_stmt_num();

W
wangzelin.wzl 已提交
703
  // 1. iterate through the formatted DmlStmt, concatenating all DmlStmtTask
S
SanmuWangZJU 已提交
704
  // 2. Recycle directly for invalid binlog records
W
wangzelin.wzl 已提交
705 706 707 708 709
  // return row ref cnt
  int link_row_list(int64_t &row_ref_cnt);

  int set_redo_log_parsed();
  int set_redo_log_formatted();
S
SanmuWangZJU 已提交
710 711 712 713 714

  int64_t dec_row_ref_cnt();
  void set_row_ref_cnt(const int64_t ref_cnt);
  int64_t get_row_ref_cnt() const { return ATOMIC_LOAD(&row_ref_cnt_); }

W
wangzelin.wzl 已提交
715 716
  TO_STRING_KV(
      K_(tls_id),
S
SanmuWangZJU 已提交
717
      K_(trans_id),
W
wangzelin.wzl 已提交
718
      KPC_(redo_node),
S
SanmuWangZJU 已提交
719 720 721 722 723 724 725 726 727 728
      K_(stmt_list),
      K_(formatted_stmt_num),
      K_(row_ref_cnt));

private:
  int revert_binlog_record_(ObLogBR *br);

private:
  void                   *host_;            // PartTransTask host

W
wangzelin.wzl 已提交
729 730
  const char             *participant_;

O
obdev 已提交
731
  logservice::TenantLSID tls_id_;           // logservice::TenantLSID
S
SanmuWangZJU 已提交
732 733
  transaction::ObTransID trans_id_;         // Transaction ID

W
wangzelin.wzl 已提交
734 735
  DmlRedoLogNode     *redo_node_;           // dml redo log node
  bool               is_big_row_;
S
SanmuWangZJU 已提交
736 737 738 739 740

  StmtList           stmt_list_;            // statement list
  int64_t            formatted_stmt_num_;   // Number of statements that formatted
  int64_t            row_ref_cnt_;          // reference count

O
obdev 已提交
741 742 743
  // thread safe allocator
  // used for Parser/Formatter/LobDataMerger
  ObCdcSafeArena arena_allocator_;          // allocator
S
SanmuWangZJU 已提交
744 745 746 747 748 749 750 751 752 753 754 755

private:
  DISALLOW_COPY_AND_ASSIGN(ObLogEntryTask);
};

/////////////////////////////////////////////////////////////////////////////////

class PartTransDispatcher;
struct TransCommitInfo;

// Partitioned transaction tasks
// Distinguish between DDL transactions, DML transactions and heartbeats to facilitate differentiation of transaction types when parsing
W
wangzelin.wzl 已提交
756
class PartTransTask : public TransTaskBase<PartTransTask>, public ObLogResourceRecycleTask, public ObILogCallback
S
SanmuWangZJU 已提交
757 758 759 760 761 762 763
{
public:
  enum TaskType
  {
    TASK_TYPE_UNKNOWN = 0,
    TASK_TYPE_DML_TRANS,          // DML trans
    TASK_TYPE_DDL_TRANS,          // DDL trans
764
    TASK_TYPE_LS_OP_TRANS,        // Log stream operations trans
W
wangzelin.wzl 已提交
765
    TASK_TYPE_LS_HEARTBEAT,       // heartbeat of LS level, used to push ls dispatch progress, to push ls dispatch progress
S
SanmuWangZJU 已提交
766
    TASK_TYPE_GLOBAL_HEARTBEAT,   // heartbeat of global level, used to pass checkpoint info for downstream
W
wangzelin.wzl 已提交
767
    TASK_TYPE_OFFLINE_LS,         // LS offline task
S
SanmuWangZJU 已提交
768 769 770 771 772
    TASK_TYPE_NOT_SERVED_TRANS,   // not served trans, convert from other trans type
    TASK_TYPE_MAX
  };
  enum ServedState
  {
W
wangzelin.wzl 已提交
773
    UNSERVED = 0,  // LS transaction is abort or LS transaction is not served
S
SanmuWangZJU 已提交
774 775 776 777 778 779 780 781 782 783 784 785 786
    SERVED = 1
  };

public:
  PartTransTask();
  virtual ~PartTransTask();

public:
  static const char *print_task_type(const TaskType type);

public:
  void reset();

787 788
  int init_log_entry_task_allocator();

S
SanmuWangZJU 已提交
789 790 791 792 793 794 795 796 797 798 799 800
  /// The initialisation process of a transaction task is divided into four stages.
  /// where: the DML transaction task processing process, where the maintenance of the completion status is completed, and the disassembly, maintenance and distribution of the task.
  ///
  /// 1. Upon receipt of the Redo log: push_redo_log();
  /// For DML: When the detection of single redo/multiple redo logs [LOB] is complete, split the sub-task and send it to the redo ObLogEntryTask, followed by the ObLogEntryTask callback processing
  /// 2. Upon receipt of the prepare log: prepare()
  /// If the transaction commits in bulk, call commit() to enter the commit or pre-commit state
  /// 3. If the Redo log is missing, continue to push_redo_log(); after the missing log is filled, prepare() again
  /// 4. If the Commit log is received: commit()
  ///
  /// @retval OB_ENTRY_EXIST redo log already exists
  /// @retval OB_LOG_MISSING redo log missing, current log push failed: LOB intermediate log scenario, missing LOB start log
W
wangzelin.wzl 已提交
801
  int push_redo_log(
S
SanmuWangZJU 已提交
802
      const transaction::ObTransID &trans_id,
W
wangzelin.wzl 已提交
803
      const palf::LSN &log_lsn,
S
SanmuWangZJU 已提交
804 805
      const int64_t tstamp,
      const char *buf,
W
wangzelin.wzl 已提交
806
      const int64_t buf_len);
S
SanmuWangZJU 已提交
807

W
wangzelin.wzl 已提交
808 809 810 811 812
  /**
   * @brief: push rollback_to info:
   * 1. lsn of rollback_to should push into all_recorded_lsns(ObTxRollbackToLog should has independent LogEntry)
   * 2. construct RollbackNode and push into rollback_list_
   * @param {LSN} &lsn lsn of ObTxRollbackToLog
C
chinaxing 已提交
813 814
   * @param {ObTxSEQ} rollback_from
   * @param {ObTxSEQ} rollback_to
W
wangzelin.wzl 已提交
815 816 817 818
   * @retval OB_ENTRY_EXIST: the tollback_to node already recorded
   * @retval OB_SUCCESS: op succ
   * @retval other error code: op fail by other reason
   */
C
chinaxing 已提交
819
  int push_rollback_to_info(const palf::LSN &lsn, const transaction::ObTxSEQ &rollback_from, const transaction::ObTxSEQ &rollback_to);
W
wangzelin.wzl 已提交
820 821

  /// set PartTrans Commit Info
S
SanmuWangZJU 已提交
822 823 824 825
  /// @param [in] trace_id              app trace id
  /// @param [in] trace_info            app trace info
  /// @retval OB_SUCCESS          Success
  /// @retval Other error codes   Fail
W
wangzelin.wzl 已提交
826
  int set_commit_info(
S
SanmuWangZJU 已提交
827 828
      const ObString &trace_id,
      const ObString &trace_info,
W
wangzelin.wzl 已提交
829 830 831 832 833 834 835 836 837 838
      const bool is_dup_ts,
      const transaction::ObXATransID &xid);
  /// prepare task.
  /// invoker:
  ///     PartTransResolver::handle_prepare_ for DIST_TRANS
  ///     PartTransResolver::handle_commit_ for SINGLE_LS_TRANS
  int prepare(
      const palf::LSN &prepare_log_lsn,
      const int64_t prepare_ts,
      PartTransDispatcher &part_trans_dispatcher);
S
SanmuWangZJU 已提交
839 840 841
  /// Submit a normal transaction task
  /// Requires that the prepares have been successful and that the redo log is complete
  ///
W
wangzelin.wzl 已提交
842 843 844 845 846 847
  /// @param [in] cluster_id              the cluster that trans belongs to
  /// @param [in] tx_id                   Transaction ID
  /// @param [in] trans_commit_version    Transaction commit version
  /// @param [in] ls_info_array           <LSID, PrepareLogLSN> array
  /// @param [in] commit_log_lsn          lsn of commit_log.
  /// @param [in] commit_log_submit_ts    submit_ts of commit_log_entry
S
SanmuWangZJU 已提交
848 849 850
  ///
  /// @retval OB_SUCCESS            Success
  /// @retval Other error codes     Fail
W
wangzelin.wzl 已提交
851 852 853 854
  int commit(
      const uint64_t cluster_id,
      const transaction::ObTransID &tx_id,
      const int64_t trans_commit_version,
855
      const transaction::TransType &trans_type,
W
wangzelin.wzl 已提交
856 857
      const transaction::ObLSLogInfoArray &ls_info_array,
      const palf::LSN &commit_log_lsn,
858 859
      const int64_t commit_log_submit_ts,
      const bool is_data_dict_mode);
S
SanmuWangZJU 已提交
860 861 862 863 864 865 866 867

  /// try to set PartTransTask in DataReady
  /// PartTransDispatcher dispatch commit part trans
  ///
  /// @retval OB_SUCCESS            Success
  /// @retval Other error codes     Fail
  int try_to_set_data_ready_status();

W
wangzelin.wzl 已提交
868
  /// Storager handle log callback
S
SanmuWangZJU 已提交
869 870 871 872
  ///
  ///
  /// @retval OB_SUCCESS  succ
  /// @retval other       fail
W
wangzelin.wzl 已提交
873
  int handle_log_callback();
S
SanmuWangZJU 已提交
874

W
wangzelin.wzl 已提交
875
  /// PartTransDispatcher::remove_task() call
S
SanmuWangZJU 已提交
876 877
  ///
  ///
W
wangzelin.wzl 已提交
878 879 880
  /// @retval OB_SUCCESS  Success
  /// @retval other       Fail
  int handle_unserved_trans();
S
SanmuWangZJU 已提交
881

W
wangzelin.wzl 已提交
882 883
  // Initialize tls_id heartbeat task information
  // Set the type to: TASK_TYPE_LS_HEARTBEAT
S
SanmuWangZJU 已提交
884
  //
W
wangzelin.wzl 已提交
885 886
  /// @param [in] tls_id        tls_id
  /// @param [in] timestamp     Heartbeat timestamp
S
SanmuWangZJU 已提交
887 888 889
  //
  /// @retval OB_SUCCESS            Success
  /// @retval Other error codes     Fail
O
obdev 已提交
890
  int init_ls_heartbeat_info(const logservice::TenantLSID &tls_id, const int64_t timestamp);
S
SanmuWangZJU 已提交
891

W
wangzelin.wzl 已提交
892
  // Initialize global heartbeat task information, global heartbeat task is independent of tls_id
S
SanmuWangZJU 已提交
893 894 895 896 897 898 899 900
  // Set the type to: TASK_TYPE_GLOBAL_HEARTBEAT
  //
  /// @param [in] timestamp          heartbeat timestamp
  //
  /// @retval OB_SUCCESS            Success
  /// @retval Other error codes     Fail
  int init_global_heartbeat_info(const int64_t timestamp);

W
wangzelin.wzl 已提交
901 902
  // Initially offline tls_id task
  // Set the type to: TASK_TYPE_OFFLINE_LS
S
SanmuWangZJU 已提交
903
  //
W
wangzelin.wzl 已提交
904
  // @param [in] tls_id          PartitionKey
S
SanmuWangZJU 已提交
905 906 907
  //
  /// @retval OB_SUCCESS            Success
  /// @retval Other error codes     Fail
O
obdev 已提交
908
  int init_offline_ls_task(const logservice::TenantLSID &tls_id);
S
SanmuWangZJU 已提交
909 910 911 912

  // is task info valid or not
  bool is_task_info_valid() const;

W
wangzelin.wzl 已提交
913
  // Convert to a non-serviceable tls_ided transaction type
S
SanmuWangZJU 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
  // Note: Only conversions from DML/DDL type transactions are supported, direct initialisation is not supported
  int convert_to_not_served_trans();

  SortedRedoLogList &get_sorted_redo_list() { return sorted_redo_list_; }
  bool is_contain_empty_redo_log() const { return 0 == sorted_redo_list_.get_node_number(); }

  void set_checkpoint_seq(const int64_t seq) { checkpoint_seq_ = seq; }
  int64_t get_checkpoint_seq() const { return checkpoint_seq_; }

  void set_type(const TaskType type) { type_ = type; }
  TaskType get_type() const { return type_; }

  void set_exec_tenant_id(const uint64_t exec_tenant_id) { exec_tenant_id_ = exec_tenant_id; }
  uint64_t get_exec_tenant_id() const { return exec_tenant_id_; }

W
wangzelin.wzl 已提交
929
  bool is_trans_type_unknown() const { return TASK_TYPE_UNKNOWN == type_; }
S
SanmuWangZJU 已提交
930
  bool is_global_heartbeat() const { return TASK_TYPE_GLOBAL_HEARTBEAT == type_; }
W
wangzelin.wzl 已提交
931 932
  bool is_ls_heartbeat() const { return TASK_TYPE_LS_HEARTBEAT == type_; }
  bool is_sys_ls_heartbeat() const
S
SanmuWangZJU 已提交
933
  {
W
wangzelin.wzl 已提交
934
    return is_ls_heartbeat() && is_sys_ls_part_trans();
S
SanmuWangZJU 已提交
935
  }
W
wangzelin.wzl 已提交
936
  bool is_sys_ls_part_trans() const { return tls_id_.is_sys_log_stream(); }
S
SanmuWangZJU 已提交
937 938
  bool is_dml_trans() const { return TASK_TYPE_DML_TRANS == type_; }
  bool is_ddl_trans() const { return TASK_TYPE_DDL_TRANS == type_; }
939
  bool is_ls_op_trans() const { return TASK_TYPE_LS_OP_TRANS == type_; }
W
wangzelin.wzl 已提交
940
  bool is_offline_ls_task() const { return TASK_TYPE_OFFLINE_LS == type_; }
S
SanmuWangZJU 已提交
941
  // Is it a DDL OFFLINE task
W
wangzelin.wzl 已提交
942
  bool is_sys_ls_offline_task() const
S
SanmuWangZJU 已提交
943
  {
W
wangzelin.wzl 已提交
944
    return is_offline_ls_task() && is_sys_ls_part_trans();
S
SanmuWangZJU 已提交
945 946
  }
  bool is_not_served_trans() const { return TASK_TYPE_NOT_SERVED_TRANS == type_; }
947
  bool is_sys_ls_dml_trans() const { return is_dml_trans() && is_sys_ls_part_trans(); }
S
SanmuWangZJU 已提交
948

O
obdev 已提交
949
  void set_task_info(const logservice::TenantLSID &tls_id, const char *info);
W
wangzelin.wzl 已提交
950

S
SanmuWangZJU 已提交
951 952 953 954
  void set_trans_id(const transaction::ObTransID &trans_id) { trans_id_ = trans_id; }
  const transaction::ObTransID &get_trans_id() const { return trans_id_; }

  uint64_t get_cluster_id() const { return cluster_id_; }
955

O
obdev 已提交
956 957
  void set_tls_id(const logservice::TenantLSID &tls_id) { tls_id_ = tls_id; }
  const logservice::TenantLSID &get_tls_id() const { return tls_id_; }
W
wangzelin.wzl 已提交
958 959
  uint64_t get_tenant_id() const { return tls_id_.get_tenant_id(); }
  const share::ObLSID &get_ls_id() const { return tls_id_.get_ls_id(); }
S
SanmuWangZJU 已提交
960

W
wangzelin.wzl 已提交
961
  const char *get_tenant_ls_str() const { return tls_str_; }
S
SanmuWangZJU 已提交
962

W
wangzelin.wzl 已提交
963 964 965 966 967
  int64_t get_prepare_ts() const { return prepare_ts_; }
  const palf::LSN &get_prepare_log_lsn() const { return prepare_log_lsn_; }
  void set_commit_ts(const int64_t commit_ts) { commit_ts_ = commit_ts; }
  int64_t get_commit_ts() const { return commit_ts_; }
  const palf::LSN &get_commit_log_lsn() const { return commit_log_lsn_; }
S
SanmuWangZJU 已提交
968 969 970 971 972 973 974 975 976 977 978 979 980 981 982

  bool is_trans_committed() const { return ATOMIC_LOAD(&is_trans_committed_); }

  void update_local_schema_version(const int64_t sm_version)
  {
    if (sm_version > 0) {
      local_schema_version_ = std::max(sm_version, local_schema_version_);
    }
  }
  int64_t get_local_schema_version() const { return local_schema_version_; }

  void *alloc(const int64_t size);
  void free(void *ptr);

  int add_stmt(const uint64_t row_index, IStmtTask *stmt_task);
W
wangzelin.wzl 已提交
983
  int add_ddl_lob_aux_stmt(const uint64_t row_index, DmlStmtTask *stmt_task);
S
SanmuWangZJU 已提交
984 985 986 987 988 989 990 991 992 993 994 995 996 997
  int add_ddl_stmt(const uint64_t row_index, DdlStmtTask *ddl_stmt);
  const StmtList &get_stmt_list() const { return stmt_list_; }
  StmtList &get_stmt_list() { return stmt_list_; }
  int64_t get_stmt_num() const { return stmt_list_.num_; }

  // Free stmt_list memory and clear the statement
  void free_stmt_list()
  {
    // FIXME: Every IStmtTask in the list of statements should be destructured here
    // but currently all the memory in the IStmtTask is allocated by the PartTransTask allocator,
    // so it is sufficient to reuse the allocator memory directly, which is also the highest performance
    stmt_list_.reset();
  }

W
wangzelin.wzl 已提交
998 999 1000 1001 1002
  void free_ddl_lob_aux_stmt_list()
  {
    ddl_lob_aux_stmt_list_.reset();
  }

S
SanmuWangZJU 已提交
1003 1004 1005
  void set_formatted();
  int wait_formatted(const int64_t timeout, common::ObCond &cond);

W
wangzelin.wzl 已提交
1006 1007 1008
  // data_ready means:
  // (1) all redo of current PartTransTask are collected(already in storage if needed)
  // (2) then iterator of sorted_redo_list is inited(will be used in redo_dispatcher and sorter)
S
SanmuWangZJU 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
  void set_data_ready();
  int wait_data_ready(const int64_t timeout);

  int64_t dec_ref_cnt();
  void set_ref_cnt(const int64_t ref_cnt);
  int64_t get_ref_cnt() const { return ref_cnt_; }

  void set_global_trans_seq(const int64_t seq) { global_trans_seq_ = seq; }
  int64_t get_global_trans_seq() const { return global_trans_seq_; }

W
wangzelin.wzl 已提交
1019 1020 1021
  void set_global_schema_version(const int64_t global_schema_version) { global_schema_version_ = global_schema_version; }
  int64_t get_global_schema_version() const { return global_schema_version_; }

S
SanmuWangZJU 已提交
1022 1023 1024
  void set_next_task(PartTransTask *next) { next_task_ = next; }
  PartTransTask *next_task() { return next_task_; }

W
wangzelin.wzl 已提交
1025
  int64_t get_trans_commit_version() const { return trans_commit_version_; }
S
SanmuWangZJU 已提交
1026 1027 1028

  common::ObIAllocator &get_allocator() { return allocator_; }

O
obdev 已提交
1029
  const transaction::ObLSLogInfoArray &get_participants() const
S
SanmuWangZJU 已提交
1030 1031 1032 1033 1034 1035
  {
    return participants_;
  }

  int64_t get_participant_count() const
  {
O
obdev 已提交
1036
    return participants_.count();
S
SanmuWangZJU 已提交
1037 1038 1039
  }

  // for unittest start
W
wangzelin.wzl 已提交
1040 1041 1042 1043
  int set_commit_log_lsn(const palf::LSN &commit_log_lsn);
  int set_participants(
      const transaction::ObLSLogInfoArray &participants,
      const palf::LSN &commit_log_lsn);
S
SanmuWangZJU 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
  // for unittest end

  void set_allocator(const int64_t page_size,
      common::ObIAllocator &large_allocator);

  void set_prealloc_page(void *page);
  void revert_prealloc_page(void *&page);

  const ObString &get_trace_id() const { return trace_id_; }
  const ObString &get_trace_info() const { return trace_info_; }
W
wangzelin.wzl 已提交
1054
  const ObString &get_part_trans_info() const { return part_trans_info_str_; }
S
SanmuWangZJU 已提交
1055 1056

  bool is_served() const { return SERVED == serve_state_; }
1057 1058
  bool is_single_ls_trans() const { return transaction::TransType::SP_TRANS == trans_type_; }
  bool is_dist_trans() const { return transaction::TransType::DIST_TRANS == trans_type_; }
W
wangzelin.wzl 已提交
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
  bool is_part_dispatch_finish() const { return sorted_redo_list_.is_dispatch_finish(); }
  void inc_sorted_br() { ATOMIC_INC(&output_br_count_by_turn_); }
  // get and reset sorted br count
  int64_t get_and_reset_sorted_br() { return ATOMIC_TAS(&output_br_count_by_turn_, 0); }
  /// get next dml_redo_node to dispatch(to reader or parser)
  /// note: if is_last_redo is true, dispatcher can't access PartTransTask because trans may be recycled any time
  /// @param dml_redo_node    [out] dml_redo_node to dispatch
  /// @param is_last_redo     [out] is last redo node in current PartTransTask
  /// @retval OB_SUCCESS      succ get redo_node
  /// @retval OB_EMPTY_RESULT no more redo to dispatch:
  ///                           (1) no redo in this part_trans_task or
  ///                           (2) all redo dispatched then call this functin again(should not happen)
  /// @retval other_code      unexpected error
  int next_redo_to_dispatch(DmlRedoLogNode *&dml_redo_node, bool &is_last_redo);
  /// get next dml stmt for sorter, the br in dml_stmt_task will append to br_commit_queue in trans_ctx.
  /// Theoretically this function faces the same problem with next_redo_to_dispatch, however it can safely access by sorter after the last stmt is outputed,
  /// THe safety is guaranteed by:
  ///   (1) sorter will set Trans state to TRANS_SORTED  after get OB_ITER_END from all participants
  ///   (2) resource_collector will just decrement ref-count of TransCtx if all br of PartTransTask are consumed
  ///   (3) committer will recycle all participants(PartTransTask) after Trans state is TRANS_SORTED
  /// note: should think about whether change this function or not while changing the recycle logic of PartTrasnTask
  ///
  /// @param dml_stmt_task    [out] dml_stmt_task get from next formatted redo(if contains valid br)
  /// @retval OB_SUCCESS      succ get dml_stmt_task
  /// @retval OB_ITER_END     all stmt are output:
  ///                           (1) part_trans_task has no valid br at all or
  ///                           (2) all br are outputed to sorter(the caller)
  /// @retval OB_NEED_RETRY   the redo node to find stmt is not formatted
  int next_dml_stmt(DmlStmtTask *&dml_stmt_task);
  const RollbackList &get_rollback_list() const { return rollback_list_; }
  void mark_read_first_record() { part_tx_fetch_state_ |= 0x01; }
  bool has_find_first_record() const { return part_tx_fetch_state_ & 0x01; }
  void mark_read_commit_info() { part_tx_fetch_state_ |= (0x01 << 1); };
  bool has_read_commit_info() const { return part_tx_fetch_state_ & (0x01 << 1); };
  void mark_read_prepare() { part_tx_fetch_state_ |= (0x01 << 2); };
  bool has_read_prepare() const { return part_tx_fetch_state_ & (0x01 << 2); };

  int push_fetched_log_entry(const palf::LSN &lsn);
  int push_back_recored_redo_lsn_arr(
      const transaction::ObRedoLSNArray &prev_redo_lsns,
      const palf::LSN &commit_info_lsn,
      const bool has_redo_in_cur_entry);
  SortedLogEntryInfo &get_sorted_log_entry_info() { return sorted_log_entry_info_; }
  int is_all_redo_log_entry_fetched(bool &is_all_redo_fetched)
  { return sorted_log_entry_info_.is_all_log_entry_fetched(is_all_redo_fetched); };
1104 1105 1106 1107 1108
  // is dispatched redo all sorted:
  OB_INLINE bool is_dispatched_redo_be_sorted() const
  {
    return ! sorted_redo_list_.has_dispatched_but_unsorted_redo();
  }
1109
  transaction::ObTxBigSegmentBuf *get_segment_buf() { return &segment_buf_; }
W
wangzelin.wzl 已提交
1110 1111 1112 1113 1114
  int push_multi_data_source_data(
      const palf::LSN &lsn,
      const transaction::ObTxBufferNodeArray &mds_data_arr,
      const bool is_commit_log);
  // parse multi_data_source in multi_data_source_node_arr to multi_data_source_info
O
obdev 已提交
1115 1116 1117
  //
  // 1. Support LS_MEMBER_TABLE MDS node.
  // 2. Support Tablet
W
wangzelin.wzl 已提交
1118
  int parse_multi_data_source_data();
O
obdev 已提交
1119 1120 1121 1122 1123 1124

  // TODO Support single registration over 1.5M
  // 1. parse multi_data_source to get the collection of changed schemas which DDL transactions affect
  int parse_multi_data_source_data_for_ddl(
      const char *caller);

W
wangzelin.wzl 已提交
1125
  // sys_ls trans which not ddl_trans/ls_table_trans won't serve
O
obdev 已提交
1126
  // TODO: Could this trans is a dist_trans? Should CDC filter this trans if is a dict_trans.
W
wangzelin.wzl 已提交
1127 1128 1129 1130 1131
  bool is_sys_ls_not_serve_trans() const
  {
    return tls_id_.is_sys_log_stream() && ! multi_data_source_info_.is_valid();
  }
  const MultiDataSourceInfo &get_multi_data_source_info() const { return multi_data_source_info_; }
X
xuhuleon 已提交
1132
  const share::ObLSAttrArray &get_ls_attr_arr() const { return multi_data_source_info_.get_ls_attr_arr(); }
O
obdev 已提交
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
  DictTenantArray &get_dict_tenant_array() { return multi_data_source_info_.get_dict_tenant_array(); }
  DictDatabaseArray &get_dict_database_array() { return multi_data_source_info_.get_dict_database_array(); }
  DictTableArray &get_dict_table_array() { return multi_data_source_info_.get_dict_table_array(); }
  // get tenant_schema_info with MultiDataSourceInfo in DDL. get from baseline data_dict if ddl
  // doesn't contains tenant_meta for specifed tenant, otherwise use tenant_meta in inc_data_dict.
  // NOTICE: ONLY AVALIABLE FOR DDL_TRANS.
  int get_tenant_schema_info_with_inc_dict(const uint64_t tenant_id, TenantSchemaInfo &tenant_schema_info);
  // get database_schema_info with MultiDataSourceInfo in DDL. get from baseline data_dict if ddl
  // doesn't contains database_meta for specifed database, otherwise use database_meta in inc_data_dict
  // NOTICE: ONLY AVALIABLE FOR DDL_TRANS.
  int get_database_schema_info_with_inc_dict(const uint64_t tenant_id, const uint64_t db_id, DBSchemaInfo &db_schema_info);
  // get dict_table_meta with MultiDataSourceInfo in DDL. get from baseline data_dict if ddl
  // doesn't contains table_meta for specifed table, otherwise use table_meta in inc_data_dict.
  // NOTICE: ONLY AVALIABLE FOR DDL_TRANS.
  int get_table_meta_with_inc_dict(const uint64_t tenant_id, const uint64_t table_id, const datadict::ObDictTableMeta *&tb_meta);
S
SanmuWangZJU 已提交
1148

1149 1150 1151 1152 1153 1154 1155
  // Check if the DDL transaction needs to be treated as a barrier.
  //
  // @param [out] is_not_barrier is not a barrier
  // @param [out] op_type Schema operation type
  int check_for_ddl_trans(
      bool &is_not_barrier,
      ObSchemaOperationType &op_type) const;
1156
  ObIAllocator &get_log_entry_task_base_allocator() { return log_entry_task_base_allocator_; };
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
  int push_tic_update_info(const TICUpdateInfo &tic_update_info);
  void get_tic_update_info(ObArray<TICUpdateInfo> &tic_update_infos) const
  {
    tic_update_infos = tic_update_infos_;
  }

  bool need_update_table_id_cache() const
  {
    return !tic_update_infos_.empty();
  }
1167
  void set_unserved() { set_unserved_(); }
1168

W
wangzelin.wzl 已提交
1169 1170
  TO_STRING_KV(
      "state", serve_state_,
S
SanmuWangZJU 已提交
1171 1172
      "type", print_task_type(type_),
      K_(type),
W
wangzelin.wzl 已提交
1173
      K_(cluster_id),
S
SanmuWangZJU 已提交
1174
      K_(exec_tenant_id),
W
wangzelin.wzl 已提交
1175
      K_(tls_id),
S
SanmuWangZJU 已提交
1176
      K_(trans_id),
1177
      "trans_type", transaction::trans_type_to_cstr(trans_type_),
W
wangzelin.wzl 已提交
1178
      K_(is_xa_or_dup),
S
SanmuWangZJU 已提交
1179
      K_(is_trans_committed),
W
wangzelin.wzl 已提交
1180 1181 1182 1183 1184
      K_(trans_commit_version),
      K_(prepare_ts),
      K_(prepare_log_lsn),
      K_(commit_ts),
      K_(commit_log_lsn),
O
obdev 已提交
1185 1186
      "participant_count", participants_.count(),
      K_(participants),
W
wangzelin.wzl 已提交
1187 1188 1189 1190 1191 1192 1193 1194 1195
      K_(trace_id),
      K_(trace_info),
      K_(sorted_log_entry_info),
      K_(sorted_redo_list),
      K_(rollback_list),
      K_(part_tx_fetch_state),
      K_(ref_cnt),
      K_(multi_data_source_node_arr),
      K_(multi_data_source_info),
S
SanmuWangZJU 已提交
1196 1197
      K_(checkpoint_seq),
      K_(global_trans_seq),
W
wangzelin.wzl 已提交
1198
      K_(global_schema_version),
S
SanmuWangZJU 已提交
1199 1200 1201 1202
      K_(local_schema_version),
      K_(stmt_list),
      K_(is_data_ready),
      KP_(wait_formatted_cond),
W
wangzelin.wzl 已提交
1203 1204
      K_(output_br_count_by_turn),
      KP_(next_task));
S
SanmuWangZJU 已提交
1205 1206 1207 1208

private:
  int init_trace_id_(const ObString &trace_id);
  int init_trace_info_(const ObString &trace_info);
W
wangzelin.wzl 已提交
1209 1210 1211 1212
  int to_string_part_trans_info_();
  int init_participant_array_(
      const transaction::ObLSLogInfoArray &participants,
      const palf::LSN &commit_log_lsn);
S
SanmuWangZJU 已提交
1213 1214
  void destroy_participant_array_();
  bool is_base_trans_info_valid_() const;
W
wangzelin.wzl 已提交
1215 1216 1217 1218 1219 1220
  // alloc LogEntryNode with specifed LSN, free while PartTransTask destroy.
  int alloc_log_entry_node_(const palf::LSN &lsn, LogEntryNode *&log_entry_node);
  // 1. memory mode: all data is in memory,
  // 2. storage mode: all data need be stored
  // 3. auto mode:
  bool need_store_data_() const;
S
SanmuWangZJU 已提交
1221
  // Handling of row start
W
wangzelin.wzl 已提交
1222 1223 1224
  int push_redo_on_row_start_(
      const bool need_store_data,
      const transaction::ObTransID &trans_id,
S
SanmuWangZJU 已提交
1225
      const memtable::ObMemtableMutatorMeta &meta,
W
wangzelin.wzl 已提交
1226
      const palf::LSN &log_lsn,
S
SanmuWangZJU 已提交
1227 1228
      const char *redo_data,
      const int64_t redo_data_size);
W
wangzelin.wzl 已提交
1229 1230 1231
  int push_ddl_redo_on_row_start_(
      const memtable::ObMemtableMutatorMeta &meta,
      const palf::LSN &log_lsn,
S
SanmuWangZJU 已提交
1232 1233 1234
      const char *redo_data,
      const int64_t redo_data_size,
      const int64_t mutator_row_size);
W
wangzelin.wzl 已提交
1235 1236
  int push_dml_redo_on_row_start_(
      const bool need_store_data,
S
SanmuWangZJU 已提交
1237
      const memtable::ObMemtableMutatorMeta &meta,
W
wangzelin.wzl 已提交
1238
      const palf::LSN &log_lsn,
S
SanmuWangZJU 已提交
1239 1240 1241
      const char *redo_data,
      const int64_t redo_data_size,
      const int64_t mutator_row_size);
W
wangzelin.wzl 已提交
1242 1243 1244 1245 1246 1247 1248 1249 1250
  int get_and_submit_store_task_(
      const uint64_t tenant_id,
      const uint8_t row_flags,
      const palf::LSN &log_lsn,
      const char *data_buf,
      const int64_t data_len);

  int check_dml_redo_node_ready_and_handle_();
  int handle_unserved_trans_();
S
SanmuWangZJU 已提交
1251
  void set_unserved_() { serve_state_ = UNSERVED; }
W
wangzelin.wzl 已提交
1252
  bool is_data_ready() const { return ATOMIC_LOAD(&is_data_ready_); }
O
obdev 已提交
1253 1254 1255 1256 1257 1258

  // MultiDataSource Transaction
  int alloc_and_save_multi_data_source_node_(
      const palf::LSN &lsn,
      const transaction::ObTxBufferNode &mds_buffer_node);

W
wangzelin.wzl 已提交
1259 1260 1261
  int parse_tablet_change_mds_(
      const MultiDataSourceNode &multi_data_source_node,
      ObCDCTabletChangeInfo &tablet_change_info);
S
SanmuWangZJU 已提交
1262 1263 1264

private:
  ServedState             serve_state_;
W
wangzelin.wzl 已提交
1265 1266
  // trans basic info
  uint64_t                cluster_id_;            // cluster ID
S
SanmuWangZJU 已提交
1267 1268 1269
  TaskType                type_;                  // task type
  uint64_t                exec_tenant_id_;        // record tenant_id for DDL task

O
obdev 已提交
1270
  logservice::TenantLSID  tls_id_;                // logservice::TenantLSID
W
wangzelin.wzl 已提交
1271
  const char              *tls_str_;
S
SanmuWangZJU 已提交
1272
  transaction::ObTransID  trans_id_;              // trans ID
W
wangzelin.wzl 已提交
1273
  ObString                part_trans_info_str_;   // tls_str + tx_id
S
SanmuWangZJU 已提交
1274 1275 1276
  // whether the transaction has been committed, i.e. whether the commit log has arrived and the whole transaction is complete
  // This variable is only relevant for DML transactions and DDL transactions
  bool                    is_trans_committed_;
W
wangzelin.wzl 已提交
1277 1278 1279 1280 1281
  int64_t                 trans_commit_version_;  // Global transaction version, transaction commit version
  int64_t                 prepare_ts_;            // Transaction PrepareLog timestamp, same with commit_ts_ if SINGLE_LS_TRANS
  palf::LSN               prepare_log_lsn_;       // PrepareLog LSN(same with commit_log_lsn_ if SINGLE_LS_TRANS)
  int64_t                 commit_ts_;             // Transaction timestamp, usually set to the Commit log timestamp
  palf::LSN               commit_log_lsn_;        // CommitLog LSN
1282
  transaction::TransType  trans_type_;
W
wangzelin.wzl 已提交
1283 1284 1285
  bool                    is_xa_or_dup_;          // true if xa dist trans or duplicate table trans.

  // participants info, used for determine the sequence of trans at sequencer moudle.
O
obdev 已提交
1286
  transaction::ObLSLogInfoArray participants_;
W
wangzelin.wzl 已提交
1287 1288 1289 1290
  // App Trace ID (get from commit_info log)
  ObString                trace_id_;
  // App Trace Info (get from commit_info log)
  ObString                trace_info_;
S
SanmuWangZJU 已提交
1291

W
wangzelin.wzl 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
  // Trans fetche log info
  SortedLogEntryInfo      sorted_log_entry_info_; // sorted log_entry list
  SortedRedoLogList       sorted_redo_list_;      // ordered redo list
  // PartTrans fetch_log state
  // part_tx_fetch_state_ & 0x01:      read first record log
  // part_tx_fetch_state_ & 0x01 << 1: read commit info log
  // part_tx_fetch_state_ & 0x01 << 2: read prepare log
  // part_tx_fetch_state_ & 0x01 << 3: read commit log
  int8_t                  part_tx_fetch_state_;
  RollbackList            rollback_list_;         // Rollback list
  int64_t                 ref_cnt_;               // ref count(redo_node count + 1)

  // For MultiDataSource
  MultiDataSourceNodeArray  multi_data_source_node_arr_;    // array record MultiDataSourceNode
  MultiDataSourceInfo       multi_data_source_info_;        // MultiDataSourceInfo
1307
  transaction::ObTxBigSegmentBuf segment_buf_;              // ObTxBigSegmentBuf for Big Tx Log
S
SanmuWangZJU 已提交
1308 1309 1310 1311 1312 1313 1314

  // checkpoint seq number
  //
  // The Fetcher assigns a seq number to all tasks that are sent down and periodically calculates the seq information to be sent down via heartbeat tasks
  // Committer sorts tasks arriving out of order based on the seq number and maintains the overall data seq by processing the tasks sequentially
  int64_t                 checkpoint_seq_;

W
wangzelin.wzl 已提交
1315
  // Trans info generate at sequencer
S
SanmuWangZJU 已提交
1316
  // Transaction serial number assigned by sequencer globally
W
wangzelin.wzl 已提交
1317
  // Distributed transaction level, tls_ided transactions within a distributed transaction have the same number
S
SanmuWangZJU 已提交
1318
  int64_t                 global_trans_seq_;
W
wangzelin.wzl 已提交
1319
  int64_t                 global_schema_version_;
S
SanmuWangZJU 已提交
1320 1321 1322

  // PartTransTask linked list structure
  // list of participants in Sequencer
W
wangzelin.wzl 已提交
1323
  // Fetcher for linking all tls_id transactions
S
SanmuWangZJU 已提交
1324 1325
  PartTransTask           *next_task_;

W
wangzelin.wzl 已提交
1326 1327 1328 1329
  // Data parsed from Redo
  int64_t                 local_schema_version_;  // Schema versions for tls_ided transactions
  StmtList                stmt_list_;             // statement list
  StmtList                ddl_lob_aux_stmt_list_; // DDL Trans Lob Aux meta stmt list
S
SanmuWangZJU 已提交
1330 1331

  common::ObByteLock      data_ready_lock_;
W
wangzelin.wzl 已提交
1332 1333 1334
  // For sys LS: whether the formatting is complete
  // For user LS: whether the all logs which need store have stored
  // Note: DML tls_id: empty redo scene, is_data_ready_ = true
S
SanmuWangZJU 已提交
1335
  bool                    is_data_ready_;
W
wangzelin.wzl 已提交
1336
  common::ObCond          wait_data_ready_cond_;
S
SanmuWangZJU 已提交
1337 1338 1339 1340

  // To optimise memory usage, the condition variable is passed in externally
  common::ObCond          *wait_formatted_cond_;

W
wangzelin.wzl 已提交
1341
  int64_t                 output_br_count_by_turn_; // sorted br count in each statistic round
S
SanmuWangZJU 已提交
1342

1343 1344
  ObArray<TICUpdateInfo>  tic_update_infos_; // table id cache update info

W
wangzelin.wzl 已提交
1345 1346 1347 1348 1349 1350
  // allocator used to alloc:
  // LogEntryNode/RollbackNode
  // DdlRedoLogNode/DmlRedoLogNode/mutator_row_data
  // trace_id/trace_info/part_trans_info_str_/participant_
  // MutatorRow(DDL)/DdlStmtTask
  ObSmallArena            allocator_;
1351
  ObLfFIFOAllocator       log_entry_task_base_allocator_;
S
SanmuWangZJU 已提交
1352 1353 1354 1355 1356 1357 1358 1359

private:
  DISALLOW_COPY_AND_ASSIGN(PartTransTask);
};

}
}
#endif