ob_partition_storage.h 35.6 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_STORAGE_OB_PARTITION_STORAGE
#define OCEANBASE_STORAGE_OB_PARTITION_STORAGE

#include "ob_partition_store.h"
#include "ob_relative_table.h"

namespace oceanbase {
namespace common {
class ObRowStore;
}
namespace share {
class ObPartitionReplica;
}
namespace blocksstable {
class ObBaseStorageLogger;
struct ObPartitionMeta;
struct ObStorageCacheSuite;
struct MacroBlockId;
class ObBuildIndexMacroBlockReader;
class ObBuildIndexMacroBlockWriter;
}  // namespace blocksstable
namespace memtable {
class ObMemtable;
}
namespace compaction {
class ObIStoreRowProcessor;
class ObSStableMergeEstimator;
class ObBuildIndexParam;
class ObBuildIndexContext;
}  // namespace compaction
namespace storageperf {
template <typename T>
class ObMultiBlockBench;
}
namespace storage {
class ObIPartitionGroupGuard;
class ObSSStore;
class ObBatch;
class ObPartitionMergeDag;
class ObMultipleMerge;
class ObSSTableCtx;
class ObSSTableSplitCtx;
class ObSSTableMergeCtx;
class ObTableScanIterIterator;
class ObPGMemtableMgr;
class ObPGStorage;
class ObTableScanIterator;

class ObStoreRowkeyHashFunc {
G
gm 已提交
62
public:
O
oceanbase-admin 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  uint64_t operator()(const common::ObStoreRowkey& rowkey, const uint64_t hash)
  {
    return rowkey.murmurhash(hash);
  }
};

struct ObColumnChecksumEntry {
  ObColumnChecksumEntry() : checksum_(0), table_id_(0)
  {}
  ObColumnChecksumEntry(int64_t checksum, int64_t table_id) : checksum_(checksum), table_id_(table_id)
  {}
  int64_t checksum_;
  int64_t table_id_;
};

class ObStorageWriterGuard {
G
gm 已提交
79
public:
O
oceanbase-admin 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  int refresh_and_protect_table(ObRelativeTable& relative_table);
  int refresh_and_protect_pg_memtable(ObPGStorage& pg_storage, ObTablesHandle& tables_handle);

  // called when writing to a specific partition
  ObStorageWriterGuard(
      ObPartitionStore& store, const ObStoreCtx& store_ctx, const bool need_control_mem, const bool is_replay = false)
      : need_control_mem_(need_control_mem),
        is_replay_(is_replay),
        store_(&store),
        store_ctx_(store_ctx),
        memtable_(NULL),
        pg_memtable_mgr_(NULL),
        retry_count_(0),
        last_ts_(0)
  {
    get_writing_throttling_sleep_interval() = 0;
  }
  // called when replay clog of pg
  ObStorageWriterGuard(const ObStoreCtx& store_ctx, ObPGMemtableMgr* pg_memtable_mgr, const bool need_control_mem,
      const bool is_replay = false)
      : need_control_mem_(need_control_mem),
        is_replay_(is_replay),
        store_(NULL),
        store_ctx_(store_ctx),
        memtable_(NULL),
        pg_memtable_mgr_(pg_memtable_mgr),
        retry_count_(0),
        last_ts_(0)
  {
    get_writing_throttling_sleep_interval() = 0;
  }
  ~ObStorageWriterGuard();
  ObStorageWriterGuard(const ObStorageWriterGuard&) = delete;
  ObStorageWriterGuard& operator=(const ObStorageWriterGuard&) = delete;

G
gm 已提交
115
private:
O
oceanbase-admin 已提交
116 117 118 119
  bool need_to_refresh_table(ObTablesHandle& tables_handle);
  bool check_if_need_log();
  void reset();

G
gm 已提交
120
private:
O
oceanbase-admin 已提交
121 122 123
  static const int64_t LOG_INTERVAL_US = 30 * 1000 * 1000;
  static const int64_t GET_TS_INTERVAL = 10 * 1000;

G
gm 已提交
124
private:
O
oceanbase-admin 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  const bool need_control_mem_;
  const bool is_replay_;
  ObPartitionStore* store_;
  const ObStoreCtx& store_ctx_;
  memtable::ObMemtable* memtable_;
  ObPGMemtableMgr* pg_memtable_mgr_;
  int64_t retry_count_;
  int64_t last_ts_;
};

struct ObPartitionPrefixAccessStat {
  struct AccessStat {
    AccessStat()
    {
      reset();
    }
    ~AccessStat() = default;
    void reset()
    {
      memset(this, 0x00, sizeof(AccessStat));
    }
    bool is_valid() const;
    AccessStat& operator=(const AccessStat& other)
    {
      if (this != &other) {
        MEMCPY(this, &other, sizeof(AccessStat));
      }
      return *this;
    }
    TO_STRING_KV(K_(bf_filter_cnt), K_(bf_access_cnt), K_(empty_read_cnt));
    int64_t bf_filter_cnt_;
    int64_t bf_access_cnt_;
    int64_t empty_read_cnt_;
  };
  ObPartitionPrefixAccessStat()
  {
    reset();
  }
  ~ObPartitionPrefixAccessStat() = default;
  ObPartitionPrefixAccessStat& operator=(const ObPartitionPrefixAccessStat& other);
  void reset()
  {
    memset(this, 0x00, sizeof(ObPartitionPrefixAccessStat));
  }
  int add_stat(const ObTableAccessStat& stat);
  int add_stat(const ObTableScanStatistic& stat);
  int get_optimal_prefix(int64_t& prefix);
  int64_t to_string(char* buf, const int64_t buf_len) const;
  const static int64_t MAX_ROWKEY_PREFIX_NUM = 7;  // avoid consuming too much space for statistics, only count first 7
  AccessStat rowkey_prefix_[MAX_ROWKEY_PREFIX_NUM + 1];
};

L
leslieyuchen 已提交
177
class ObSingleRowGetter;
O
oceanbase-admin 已提交
178 179 180 181
class ObPartitionStorage : public ObIPartitionStorage {
  template <typename T>
  friend class oceanbase::storageperf::ObMultiBlockBench;
  friend class ObPGStorage;
L
leslieyuchen 已提交
182
  friend class ObSingleRowGetter;
O
oceanbase-admin 已提交
183

G
gm 已提交
184
public:
O
oceanbase-admin 已提交
185 186 187
  ObPartitionStorage();
  virtual ~ObPartitionStorage();

G
gm 已提交
188
  inline virtual const share::schema::ObMultiVersionSchemaService* get_schema_service() const override
O
oceanbase-admin 已提交
189 190 191
  {
    return schema_service_;
  }
G
gm 已提交
192 193 194
  virtual int init(const common::ObPartitionKey& pkey, ObIPartitionComponentFactory* cp_fty,
      share::schema::ObMultiVersionSchemaService* schema_service, transaction::ObTransService* txs,
      ObPGMemtableMgr& pg_memtable_mgr) override;
O
oceanbase-admin 已提交
195

G
gm 已提交
196
  virtual void destroy() override;
G
gm 已提交
197 198 199 200
  virtual const common::ObPartitionKey& get_partition_key() const override
  {
    return pkey_;
  }
O
oceanbase-admin 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
  bool is_inited() const;

  TO_STRING_KV(K_(pkey), K_(store));
  //
  // scan table partition
  //
  // @param ctx [in] transaction context
  // @param param [in] query param
  // @param result [out] iterator to get the result set
  //
  // @return result iterator
  //
  virtual int table_scan(
      ObTableScanParam& param, const int64_t data_max_schema_version, common::ObNewRowIterator*& result) override;
  virtual int table_scan(
      ObTableScanParam& param, const int64_t data_max_schema_version, common::ObNewIterIterator*& result) override;
  virtual int join_mv_scan(ObTableScanParam& left_param, ObTableScanParam& right_param,
      const int64_t left_data_max_schema_version, const int64_t right_data_max_schema_version,
      ObIPartitionStorage& right_storage, common::ObNewRowIterator*& result) override;
  virtual int revert_scan_iter(common::ObNewRowIterator* iter) override;
  //
  // delete rows
  //     delete table rows and index rows
  //
  // @param trans_desc [in] transaction handle
  // @param timeout [in] process timeout
  // @param table_id [in] table
  // @param index_included [in] need to delete index too
  // @param column_ids [in] all column referenced, rowkey first
  // @param row_iter [in] primary keys to be deleted
  // @param affected_rows [out]
  //
  // @retval OB_TRANSACTION_SET_VIOLATION
  // @retval OB_SNAPSHOT_DISCARDED
  // @retval OB_TRANS_NOT_FOUND
  // @retval OB_TRANS_ROLLBACKED
  // @retval OB_TRANS_IS_READONLY
  // @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
  //
  virtual int delete_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, common::ObNewRowIterator* row_iter,
      int64_t& affected_rows) override;

  virtual int delete_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, const common::ObNewRow& row) override;

  virtual int put_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, common::ObNewRowIterator* row_iter,
      int64_t& affected_rows) override;

  //
  // insert rows
  //     insert table rows and index rows
  //
  // @param trans_desc [in] transaction handle
  // @param timeout [in] process timeout
  // @param table_id [in] table
  // @param column_ids [in] insert columns
  // @param row_iter [in] insert values
  // @param affected_rows [out]
  //
  // @retval OB_TRANS_NOT_FOUND
  // @retval OB_TRANS_ROLLBACKED
  // @retval OB_TRANS_IS_READONLY
  // @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
  // @retval OB_ERR_PRIMARY_KEY_DUPLICATE
  //
  virtual int insert_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, common::ObNewRowIterator* row_iter,
      int64_t& affected_rows) override;
  //
  // insert row
  //     insert table row or return conflict row(s)
  //
  // @param trans_desc [in] transaction handle
  // @param timeout [in] process timeout
  // @param column_ids [in] insert columns
  // @param duplicated_column_ids [in] output columns when conflict met
  // @param row [in] row to be inserted
  // @param flag [in] return all conflict rows or the first one
  // @param affected_rows [out] successfully insert row number
  // @param duplicated_rows [out] the iterator of the rowkey(s) of conflict row(s)
  //
G
gm 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
  virtual int insert_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, const common::ObNewRow& row);

  virtual int insert_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, const common::ObIArray<uint64_t>& duplicated_column_ids,
      const common::ObNewRow& row, const ObInsertFlag flag, int64_t& affected_rows,
      common::ObNewRowIterator*& duplicated_rows) override;
  // check whether row has conflict in storage
  // in_column_ids describe columns of the row, begin with rowey, must include local unique index
  // out_column_ids describe column of conflict row
  // check_row_iter is the iterator of rows that will be checked
  // dup_row_iters are iterators of conflict rows, the number of iterators is same with number of checked rows
  virtual int fetch_conflict_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& in_column_ids, const common::ObIArray<uint64_t>& out_column_ids,
      common::ObNewRowIterator& check_row_iter, common::ObIArray<common::ObNewRowIterator*>& dup_row_iters) override;
  virtual int revert_insert_iter(common::ObNewRowIterator* iter) override;
O
oceanbase-admin 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
  //
  // update rows
  //     update table rows and index rows
  //
  // @param trans_desc [in] transaction handle
  // @param timeout [in] process timeout
  // @param table_id [in] table
  // @param index_included [in] if index need to be updated
  // @param column_ids [in] all columns related
  // @param column_ids [in] updated columns
  // @param row_iter [in] odd rows are old and even rows are new ones
  // @param affected_rows [out]
  //
  // @retval OB_TRANSACTION_SET_VIOLATION
  // @retval OB_SNAPSHOT_DISCARDED
  // @retval OB_TRANS_NOT_FOUND
  // @retval OB_TRANS_ROLLBACKED
  // @retval OB_TRANS_IS_READONLY
  // @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
  // @retval OB_ERR_PRIMARY_KEY_DUPLICATE
  //
  virtual int update_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, const common::ObIArray<uint64_t>& updated_column_ids,
      common::ObNewRowIterator* row_iter, int64_t& affected_rows) override;
  virtual int update_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param,
      const common::ObIArray<uint64_t>& column_ids, const common::ObIArray<uint64_t>& updated_column_ids,
      const common::ObNewRow& old_row, const common::ObNewRow& new_row) override;

  //
  // lock rows
  //     lock table rows
  //
  // @param trans_desc [in] transaction handle
  // @param timeout [in] process timeout
  // @param table_id [in] table id
  // @param row_iter [in] rowkey iterator
  // @param lock_flag [in] lock flags: LF_WRITE or LF_NONE
  // @param affected_rows [out]
  //
  // @retval OB_TRANSACTION_SET_VIOLATION
  // @retval OB_SNAPSHOT_DISCARDED
  // @retval OB_TRANS_NOT_FOUND
  // @retval OB_TRANS_ROLLBACKED
  // @retval OB_TRANS_IS_READONLY
  // @retval OB_ERR_EXCLUSIVE_LOCK_CONFLICT
  //
G
gm 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358
  virtual int lock_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param, const int64_t abs_lock_timeout,
      common::ObNewRowIterator* row_iter, ObLockFlag lock_flag, int64_t& affected_rows) override;
  virtual int lock_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param, const int64_t abs_lock_timeout,
      const common::ObNewRow& row, ObLockFlag lock_flag) override;

  virtual int get_concurrent_cnt(uint64_t table_id, int64_t schema_version, int64_t& concurrent_cnt) override;

  virtual int get_batch_rows(const ObTableScanParam& param, const storage::ObBatch& batch, int64_t& logical_row_count,
      int64_t& physical_row_count, common::ObIArray<common::ObEstRowCountRecord>& est_records) override;

  virtual int get_table_stat(const uint64_t table_id, common::ObTableStat& stat) override;

  virtual int lock(const ObStoreCtx& ctx) override;
O
oceanbase-admin 已提交
359

L
LINxiansheng 已提交
360 361
  virtual void set_merge_status(bool merge_success) override;
  virtual bool can_schedule_merge() override;
O
oceanbase-admin 已提交
362 363 364 365 366 367 368

  // write ssstore objects @version tree to data file, used by write_check_point
  virtual int serialize(ObArenaAllocator& allocator, char*& new_buf, int64_t& serialize_size);
  // read ssstore objects from data file to construct partition storage's version tree.
  virtual int deserialize(const ObReplicaType replica_type, const char* buf, const int64_t buf_len,
      ObIPartitionGroup* pg, int64_t& pos, bool& is_old_meta, ObPartitionStoreMeta& old_meta);

L
LINxiansheng 已提交
369
  virtual bool has_memstore() override;
O
oceanbase-admin 已提交
370 371

  // freeze actions
L
LINxiansheng 已提交
372 373 374 375 376
  virtual int get_replayed_table_version(int64_t& table_version) override;
  virtual int get_partition_ss_store_info(common::ObArray<PartitionSSStoreInfo>& partition_ss_store_info_list) override;
  virtual int get_all_tables(ObTablesHandle& tables_handle) override;
  virtual int retire_warmup_store(const bool is_disk_full) override;
  virtual int halt_prewarm() override;
O
oceanbase-admin 已提交
377

L
LINxiansheng 已提交
378
  virtual common::ObReplicaType get_replica_type() override
O
oceanbase-admin 已提交
379 380 381 382 383 384
  {
    return store_.get_replica_type();
  }
  virtual int query_range_to_macros(common::ObIAllocator& allocator,
      const common::ObIArray<common::ObStoreRange>& ranges, const int64_t type, uint64_t* macros_count,
      const int64_t* total_task_count, common::ObIArray<common::ObStoreRange>* splitted_ranges,
L
LINxiansheng 已提交
385 386
      common::ObIArray<int64_t>* split_index) override;
  virtual int get_multi_ranges_cost(const common::ObIArray<common::ObStoreRange>& ranges, int64_t& total_size) override;
O
oceanbase-admin 已提交
387 388
  virtual int split_multi_ranges(const common::ObIArray<common::ObStoreRange>& ranges,
      const int64_t expected_task_count, common::ObIAllocator& allocator,
L
LINxiansheng 已提交
389
      common::ObArrayArray<common::ObStoreRange>& multi_range_split_array) override;
O
oceanbase-admin 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
  virtual int append_local_sort_data(const share::ObBuildIndexAppendLocalDataParam& param,
      const common::ObPGKey& pg_key, const blocksstable::ObStorageFileHandle& file_handle,
      common::ObNewRowIterator& iter) override;

  int get_schemas_to_split(storage::ObSSTableSplitCtx& ctx);
  int get_schemas_to_merge(storage::ObSSTableMergeCtx& ctx);
  int check_is_schema_changed(const int64_t column_checksum_method,
      const share::schema::ObTableSchema& base_table_schema, const share::schema::ObTableSchema& main_table_schema,
      bool& is_schema_changed, bool& is_column_changed, bool& is_progressive_merge_num_changed);
  int cal_major_merge_param(ObSSTableMergeCtx& ctx);
  int cal_minor_merge_param(ObSSTableMergeCtx& ctx);
  int build_merge_ctx(storage::ObSSTableMergeCtx& ctx);
  int get_concurrent_cnt(ObTablesHandle& tables_handle, const share::schema::ObTableSchema& table_schema,
      const ObMergeType& merge_type, int64_t& concurrent_cnt);

  int init_split_context(storage::ObSSTableSplitCtx& ctx);
  int init_merge_context(storage::ObSSTableMergeCtx& ctx);

  // TODO(): following methods need to be moved to merge_task.cpp later
  static void check_data_checksum(const common::ObReplicaType& replica_type, const ObPartitionKey& pkey,
      const share::schema::ObTableSchema& schema, const ObSSTable& data_sstable,
      common::ObIArray<storage::ObITable*>& base_tables, ObIPartitionReport& report);
  static void dump2text(const share::schema::ObTableSchema& schema, common::ObIArray<storage::ObITable*>& base_tables,
      const ObPartitionKey& pkey);
  static int update_estimator(const share::schema::ObTableSchema* base_schema, const bool is_full,
      const ObIArray<ObColumnStat*>& column_stats, ObSSTable* sstable, const common::ObPartitionKey& pkey);
  int create_partition_store(const common::ObReplicaType& replica_type, const int64_t multi_version_start,
      const uint64_t data_table_id, const int64_t create_schema_version, const int64_t create_timestamp,
      ObIPartitionGroup* pg, ObTablesHandle& sstables_handle);
  ObPartitionStore& get_partition_store()
  {
    return store_;
  }
L
LINxiansheng 已提交
423 424 425
  int do_warm_up_request(const ObIWarmUpRequest* request) override;
  int check_index_need_build(const share::schema::ObTableSchema& index_schema, bool& need_build) override;
  int get_build_index_context(const compaction::ObBuildIndexParam& param, compaction::ObBuildIndexContext& context) override;
O
oceanbase-admin 已提交
426 427

  int local_sort_index_by_range(
L
LINxiansheng 已提交
428
      const int64_t idx, const compaction::ObBuildIndexParam& param, const compaction::ObBuildIndexContext& context) override;
O
oceanbase-admin 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454

  int get_build_index_param(const uint64_t index_id, const int64_t schema_version, ObIPartitionReport* report,
      compaction::ObBuildIndexParam& param) override;
  int save_split_state(const int64_t state, const bool write_slog);
  int get_split_state(int64_t& state) const;
  int save_split_info(const ObPartitionSplitInfo& split_info, const bool write_slog);
  int check_table_continuity(const uint64_t table_id, const int64_t version, bool& is_continual);
  template <typename T>
  static int get_merge_opt(const int64_t merge_version, const int64_t storage_version, const int64_t work_version,
      const int64_t born_version, const T& old_val, const T& new_val, T& opt);
  int update_multi_version_start(const int64_t multi_version_start) override;
  int validate_sstables(const ObIArray<share::schema::ObIndexTableStat>& index_stats, bool& is_all_checked);
  bool need_check_index_(const uint64_t index_id, const ObIArray<share::schema::ObIndexTableStat>& index_stats);
  static int generate_index_output_param(const share::schema::ObTableSchema& data_table_schema,
      const share::schema::ObTableSchema& index_schema, common::ObArray<share::schema::ObColDesc>& col_ids,
      common::ObArray<share::schema::ObColDesc>& org_col_ids, common::ObArray<int32_t>& output_projector,
      int64_t& unique_key_cnt);
  int fill_checksum(const uint64_t index_id, const int sstable_type, share::ObSSTableChecksumItem& checksum_item);
  ObPartitionPrefixAccessStat& get_prefix_access_stat()
  {
    return prefix_access_stat_;
  }
  int feedback_scan_access_stat(const ObTableScanParam& param);

  int erase_stat_cache();

G
gm 已提交
455
public:
O
oceanbase-admin 已提交
456 457
  typedef common::hash::ObHashMap<int64_t, ObColumnChecksumEntry, common::hash::NoPthreadDefendMode> ColumnChecksumMap;

G
gm 已提交
458
private:
O
oceanbase-admin 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
  struct RowReshape {
    RowReshape()
        : row_reshape_cells_(NULL),
          char_only_(false),
          binary_buffer_len_(0),
          binary_buffer_ptr_(NULL),
          binary_len_array_()
    {}
    common::ObObj* row_reshape_cells_;
    bool char_only_;
    int64_t binary_buffer_len_;
    char* binary_buffer_ptr_;
    // pair: binary column idx in row, binary column len
    common::ObSEArray<std::pair<int32_t, int32_t>, common::OB_ROW_MAX_COLUMNS_COUNT> binary_len_array_;
  };

  enum ChangeType {
    NO_CHANGE,
    ROWKEY_CHANGE,
    ND_ROWKEY_CHANGE,  // null dependent rowkey change
    OTHER_CHANGE,
  };

  struct ObDMLRunningCtx {
G
gm 已提交
483
  public:
O
oceanbase-admin 已提交
484 485 486 487 488 489 490 491 492
    ObDMLRunningCtx(const ObStoreCtx& store_ctx, const ObDMLBaseParam& dml_param, common::ObIAllocator& allocator,
        const ObRowDml dml_type)
        : store_ctx_(store_ctx),
          dml_param_(dml_param),
          allocator_(allocator),
          dml_type_(dml_type),
          relative_tables_(allocator),
          col_map_(nullptr),
          col_descs_(nullptr),
L
leslieyuchen 已提交
493
          column_ids_(nullptr),
O
oceanbase-admin 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
          idx_col_descs_(),
          tbl_row_(),
          idx_row_(NULL),
          is_inited_(false)
    {}
    ~ObDMLRunningCtx()
    {
      free_work_members();
    }

    int init(const common::ObIArray<uint64_t>* column_ids, const common::ObIArray<uint64_t>* upd_col_ids,
        const bool use_table_param, share::schema::ObMultiVersionSchemaService* schema_service,
        ObPartitionStore& store);
    static int prepare_column_desc(
        const common::ObIArray<uint64_t>& column_ids, const ObRelativeTable& table, ObColDescIArray& col_descs);

G
gm 已提交
510
  private:
O
oceanbase-admin 已提交
511 512 513 514
    void free_work_members();
    int prepare_index_row();
    int prepare_column_info(const common::ObIArray<uint64_t>& column_ids);

G
gm 已提交
515
  public:
O
oceanbase-admin 已提交
516 517 518 519 520
    const ObStoreCtx& store_ctx_;
    const ObDMLBaseParam& dml_param_;
    common::ObIAllocator& allocator_;
    const ObRowDml dml_type_;
    ObRelativeTables relative_tables_;
L
leslieyuchen 已提交
521 522 523
    const share::schema::ColumnMap *col_map_;
    const ObColDescIArray *col_descs_;
    const common::ObIArray<uint64_t> *column_ids_;
O
oceanbase-admin 已提交
524 525 526 527
    ObColDescArray idx_col_descs_;
    ObStoreRow tbl_row_;
    ObStoreRow* idx_row_;  // not a must, allocate dynamically

G
gm 已提交
528
  private:
O
oceanbase-admin 已提交
529 530 531 532 533
    bool is_inited_;
  };

  static const int32_t LOCK_WAIT_INTERVAL = 5;  // 5us

G
gm 已提交
534
private:
O
oceanbase-admin 已提交
535 536 537 538 539 540 541 542 543 544 545 546
  int write_index_row(ObRelativeTable& relative_table, const ObStoreCtx& ctx, const ObColDescIArray& idx_columns,
      ObStoreRow& index_row);
  int check_other_columns_in_column_ids(const ObRelativeTables& rel_schema, const share::schema::ColumnMap* col_map,
      const common::ObIArray<uint64_t>& column_ids, const ObRowDml dml_type, const ChangeType change_type,
      const bool is_total_quantity_log);
  int construct_column_ids_map(const common::ObIArray<uint64_t>& column_ids, const int64_t total_column_count,
      common::ObIAllocator& work_allocator, share::schema::ColumnMap*& col_map);
  void deconstruct_column_ids_map(common::ObIAllocator& work_allocator, share::schema::ColumnMap*& col_map);
  int check_column_ids_valid(common::ObIAllocator& work_allocator, const ObRelativeTables& relative_tables,
      const share::schema::ColumnMap* col_map, const common::ObIArray<uint64_t>& column_ids, const ObRowDml dml_type,
      const common::ObIArray<uint64_t>& upd_column_ids, const ChangeType change_type, const bool is_total_quantity_log);
  int insert_table_row(
L
leslieyuchen 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
      ObDMLRunningCtx &run_ctx, ObRelativeTable &relative_table, const ObColDescIArray &col_descs, ObStoreRow &row);
  int insert_table_rows(ObDMLRunningCtx &run_ctx, ObRelativeTable &relative_table, const ObColDescIArray &col_descs,
      ObRowsInfo &rows_info);
  int insert_index_rows(ObDMLRunningCtx &run_ctx, ObStoreRow *rows, int64_t row_count);
  int direct_insert_row_and_index(ObDMLRunningCtx &run_ctx, const ObStoreRow &tbl_row);
  int convert_row_to_rowkey(ObSingleRowGetter &index_row_getter, ObStoreRowkey &rowkey);
  int get_conflict_row(ObDMLRunningCtx &run_ctx, const common::ObIArray<uint64_t> &out_col_ids,
      ObRelativeTable &relative_table, const ObStoreRowkey &rowkey, common::ObNewRowIterator *&duplicated_rows);
  int get_index_conflict_row(ObDMLRunningCtx &run_ctx, const common::ObIArray<uint64_t> &out_col_ids,
      ObRelativeTable &relative_table, bool need_index_back, const common::ObNewRow &row,
      common::ObNewRowIterator *&duplicated_rows);
  int single_get_row(ObSingleRowGetter &row_getter,
                     const ObStoreRowkey &rowkey,
                     common::ObNewRowIterator *&duplicated_rows,
                     int64_t data_table_rowkey_cnt);
  int get_conflict_rows(ObDMLRunningCtx &run_ctx, const ObInsertFlag flag,
      const common::ObIArray<uint64_t> &dup_col_ids, const common::ObNewRow &row,
      common::ObNewRowIterator *&duplicated_rows);
O
oceanbase-admin 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
  int get_change_type(
      const common::ObIArray<uint64_t>& update_ids, const ObRelativeTable& table, ChangeType& change_type);
  int check_rowkey_change(const common::ObIArray<uint64_t>& update_ids, const ObRelativeTables& relative_tables,
      common::ObIArray<ChangeType>& changes, bool& delay_new);
  int check_rowkey_value_change(const common::ObNewRow& old_row, const common::ObNewRow& new_row,
      const int64_t rowkey_len, bool& rowkey_change) const;
  int process_old_row(ObDMLRunningCtx& ctx, const bool data_tbl_rowkey_change,
      const common::ObIArray<ChangeType>& change_flags, const ObStoreRow& tbl_row);
  int process_row_of_data_table(ObDMLRunningCtx& run_ctx, const common::ObIArray<int64_t>& update_idx,
      const ObStoreRow& old_tbl_row, const ObStoreRow& new_tbl_row, const bool rowkey_change);
  int process_row_of_index_tables(
      ObDMLRunningCtx& run_ctx, const common::ObIArray<ChangeType>& change_flags, const ObStoreRow& new_tbl_row);
  int process_new_row(ObDMLRunningCtx& run_ctx, const common::ObIArray<ChangeType>& change_flags,
      const common::ObIArray<int64_t>& update_idx, const ObStoreRow& old_tbl_row, const ObStoreRow& new_tbl_row,
      const bool rowkey_change);
  int reshape_delete_row(
      ObDMLRunningCtx& run_ctx, RowReshape*& row_reshape, ObStoreRow& tbl_row, ObStoreRow& new_tbl_row);
  int delete_row(ObDMLRunningCtx& run_ctx, RowReshape*& row_reshape, const common::ObNewRow& row);
  int update_row(ObDMLRunningCtx& run_ctx, const common::ObIArray<ChangeType>& changes,
      const common::ObIArray<int64_t>& update_idx, const bool delay_new, RowReshape*& old_row_reshape_ins,
      RowReshape*& row_reshape_ins, ObStoreRow& old_tbl_row, ObStoreRow& new_tbl_row, ObRowStore* row_store,
      bool& duplicate);
  bool illegal_filter(const ObTableScanParam& param) const;

  // $lta_param && $lta_ctx are inited
  int join_mv_init_merge_param(const share::schema::ObTableSchema& ltable, const share::schema::ObTableSchema& rtable,
      const ObTableAccessParam& lta_param, const ObTableAccessContext& lta_ctx, share::schema::ObTableParam& rt_param,
      ObTableAccessParam& rta_param, ObTableAccessContext& rta_ctx, ObStoreCtx& rctx);

  int prepare_merge_mv_depend_sstable(const common::ObVersion& frozen_version,
      const share::schema::ObTableSchema* schema, const share::schema::ObTableSchema* dep_schema,
      ObTablesHandle& mv_dep_tables_handle);

  int estimate_row_count(const storage::ObTableScanParam& param, const storage::ObBatch& batch,
      const common::ObIArray<ObITable*>& stores, ObPartitionEst& cost_estimate,
      common::ObIArray<common::ObEstRowCountRecord>& est_records);
  // In sql static engine cell is projected to ObExpr directly in ObMultipleMerge.
  // We need to project back to ObNewRow when for row locking (lock_row() need ObNewRow).
  //
  // Why not project to ObNewRow directly in static engine for lock? Because the filter
  // need the ObExpr interface.
  int prepare_lock_row(const ObTableScanParam& scan_param, const common::ObNewRow& row);
  int lock_scan_rows(const ObStoreCtx& ctx, const ObTableScanParam& scan_param, ObTableScanIterator& iter);
  int lock_scan_rows(const ObStoreCtx& ctx, const ObTableScanParam& scan_param, ObTableScanIterIterator& iter);
  int64_t get_lock_wait_timeout(const int64_t abs_lock_timeout, const int64_t stmt_timeout) const;
  int get_next_row_from_iter(common::ObNewRowIterator* iter, ObStoreRow& store_row, const bool need_copy_cells);
  int construct_update_idx(const share::schema::ColumnMap* col_map, const common::ObIArray<uint64_t>& upd_col_ids,
      common::ObIArray<int64_t>& update_idx);
  int malloc_rows_reshape_if_need(common::ObIAllocator& work_allocator, const ObColDescIArray& col_descs,
      const int64_t row_count, const ObRelativeTable& table, const ObSQLMode sql_mode, RowReshape*& row_reshape_ins);
  int malloc_rows_reshape(common::ObIAllocator& work_allocator, const ObColDescIArray& col_descs,
      const int64_t row_count, const ObRelativeTable& table, RowReshape*& row_reshape_ins);
  void free_row_reshape(common::ObIAllocator& work_allocator, RowReshape*& row_reshape_ins, int64_t row_count);
  int need_reshape_table_row(const common::ObNewRow& row, RowReshape* row_reshape_ins, int64_t row_reshape_cells_count,
      ObSQLMode sql_mode, bool& need_reshape) const;
  int need_reshape_table_row(
      const common::ObNewRow& row, const int64_t column_cnt, ObSQLMode sql_mode, bool& need_reshape) const;
  int reshape_row(const common::ObNewRow& row, const int64_t column_cnt, RowReshape* row_reshape_ins,
      ObStoreRow& tbl_row, bool need_reshape, ObSQLMode sql_mode) const;
  int reshape_table_rows(const common::ObNewRow* rows, RowReshape* row_reshape_ins, int64_t row_reshape_cells_count,
      ObStoreRow* tbl_rows, int64_t row_count, ObSQLMode sql_mode) const;

  virtual int extract_rowkey(const ObRelativeTable& table, const common::ObStoreRowkey& rowkey, char* buffer,
      const int64_t buffer_len, const common::ObTimeZoneInfo* tz_info = NULL);

G
gm 已提交
630
private:
O
oceanbase-admin 已提交
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
  int get_depend_table_schema(const share::schema::ObTableSchema* table_schema,
      share::schema::ObSchemaGetterGuard& schema_guard, const share::schema::ObTableSchema*& dep_table_schema);
  int report_checksum(const uint64_t execution_id, const uint64_t task_id,
      const share::schema::ObTableSchema& index_schema, const int64_t checksum_method, int64_t* column_checkum);

  int get_build_index_stores(const share::schema::ObTenantSchema& tenant_schema, compaction::ObBuildIndexParam& param);
  int check_need_update_estimator(
      const share::schema::ObTableSchema& table_schema, int64_t data_version, int64_t& stat_sampling_ratio);
  int do_rowkey_exists(const ObStoreCtx& store_ctx, const common::ObIArray<ObITable*>& read_tables,
      const int64_t table_id, const common::ObStoreRowkey& rowkey, const common::ObQueryFlag& query_flag,
      const ObColDescIArray& col_descs, bool& exists);
  int rowkey_exists(ObRelativeTable& relative_table, const ObStoreCtx& store_ctx, const ObColDescIArray& col_descs,
      const common::ObNewRow& row, bool& exists);
  int do_rowkeys_prefix_exist(const common::ObIArray<ObITable*>& read_stores, ObRowsInfo& rows_info, bool& may_exist);
  int do_rowkeys_exists(const common::ObIArray<ObITable*>& read_stores, ObRowsInfo& rows_info, bool& exists);
  int rowkeys_exists(const ObStoreCtx& store_ctx, ObRelativeTable& relative_table, ObRowsInfo& rows_info, bool& exists);
  int write_row(ObRelativeTable& relative_table, const ObStoreCtx& store_ctx, const int64_t rowkey_len,
      const common::ObIArray<share::schema::ObColDesc>& col_descs, const storage::ObStoreRow& row);
  int write_row(ObRelativeTable& relative_table, const storage::ObStoreCtx& ctx, const int64_t rowkey_len,
      const common::ObIArray<share::schema::ObColDesc>& col_descs, const common::ObIArray<int64_t>& update_idx,
      const storage::ObStoreRow& old_row, const storage::ObStoreRow& new_row);
  int lock_row(ObRelativeTable& relative_table, const storage::ObStoreCtx& store_ctx,
      const common::ObIArray<share::schema::ObColDesc>& col_descs, const common::ObNewRow& row,
      const ObSQLMode sql_mode, ObIAllocator& allocator, RowReshape*& row_reshape_ins);
  int lock_row(ObRelativeTable& relative_table, const storage::ObStoreCtx& store_ctx,
      const common::ObIArray<share::schema::ObColDesc>& col_descs, const common::ObStoreRowkey& rowkey);
  int check_row_locked_by_myself(ObRelativeTable& relative_table, const storage::ObStoreCtx& store_ctx,
      const common::ObIArray<share::schema::ObColDesc>& col_descs, const common::ObStoreRowkey& rowkey, bool& locked);

  int insert_rows_(ObDMLRunningCtx& run_ctx, const common::ObNewRow* const rows, const int64_t row_count,
      ObRowsInfo& rows_info, storage::ObStoreRow* tbl_rows, RowReshape* row_reshape_ins, int64_t& afct_num,
      int64_t& dup_num);

  bool is_expired_version(
      const common::ObVersion& base_version, const common::ObVersion& version, const int64_t max_kept_number);

  int get_merge_level(const int64_t merge_version, const ObSSTableMergeCtx& ctx, ObMergeLevel& merge_level);

  int get_store_column_checksum_in_micro(
      const int64_t merge_version, const ObSSTableMergeCtx& ctx, bool& store_column_checksum);

  int get_local_index_column_ids_and_projector(const share::schema::ObTableSchema& table_schema,
      const share::schema::ObTableSchema& index_schema, ObArray<share::schema::ObColDesc>& org_col_ids,
      ObArray<share::schema::ObColDesc>& col_ids, ObArray<int32_t>& output_projector);

  int init_partition_meta(const common::ObReplicaType& replica_type, int64_t data_version,
      const int64_t multi_version_start, const uint64_t data_table_id, const int64_t create_schema_version,
      const int64_t create_timestamp, const int64_t snapshot_version, ObPGPartitionStoreMeta& partition_meta);
  int replay_old_ssstore(ObSSStore& store, int64_t& last_publish_version, ObArray<ObITable::TableKey>& add_tables);
  int deep_copy_build_index_schemas(const share::schema::ObTableSchema* data_table_schema,
      const share::schema::ObTableSchema* index_schema, const share::schema::ObTableSchema* dep_table_schema,
      compaction::ObBuildIndexParam& param);
  int validate_sstable(const int64_t row_count, const ObSSTable* sstable, ColumnChecksumMap& cc_map);
  void handle_error_index_table(
      const ObSSTable& index_table, const ObIArray<share::schema::ObIndexTableStat>& index_stats, int& result);
  int try_update_report_status(const common::ObVersion& version, bool& finished, bool& need_report);
  int check_schema_version_for_bounded_staleness_read_(
      const int64_t table_version_for_read, const int64_t data_max_schema_version, const uint64_t table_id);
  int lock_rows_(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param, const int64_t abs_lock_timeout,
      const common::ObNewRow& row, ObLockFlag lock_flag, RowReshape*& row_reshape);
  int lock_rows_(
      const ObStoreCtx& ctx, const ObTableScanParam& scan_param, const common::ObNewRow& row, RowReshape*& row_reshape);
693
  int check_useless_index_mini_merge(const storage::ObSSTableMergeCtx &ctx);
694
  void check_leader_changed_for_sql_recheck_(ObDMLRunningCtx &run_ctx, int &ret);
L
leslieyuchen 已提交
695 696 697 698 699 700 701 702 703 704 705 706 707 708

  int dump_error_info(ObSSTable &main_sstable, ObSSTable &index_sstable);
  //////////////////
  /// do write row strict check
  int check_old_row_legitimacy(ObDMLRunningCtx &run_ctx, const common::ObNewRow &row);
  int check_delete_index_legitimacy(
      ObDMLRunningCtx &run_ctx, ObRelativeTable &index_table, const common::ObNewRow &row);
  int check_new_row_legitimacy(ObDMLRunningCtx &run_ctx, const common::ObNewRow &row);
  int check_new_row_nullable_value(
      const common::ObIArray<uint64_t> &column_ids, ObRelativeTable &data_table, const common::ObNewRow &new_row);
  int check_new_row_nullable_value(const common::ObIArray<share::schema::ObColDesc> &col_descs,
      ObRelativeTable &relative_table, const common::ObNewRow &new_row);
  int check_new_row_shadow_pk(
      const common::ObIArray<uint64_t> &column_ids, ObRelativeTable &data_table, const common::ObNewRow &new_row);
O
oceanbase-admin 已提交
709 710 711
  // disallow copy;
  DISALLOW_COPY_AND_ASSIGN(ObPartitionStorage);

G
gm 已提交
712
protected:
O
oceanbase-admin 已提交
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
  // data members
  static const int64_t DELAY_SCHEDULE_TIME_UNIT = 1000 * 1000 * 1;  // 1s
  static const int64_t MAGIC_NUM_BEFORE_1461 = -0xABCD;
  static const int64_t MAGIC_NUM_1_4_61 = -0xABCE;
  uint64_t cluster_version_;
  common::ObPartitionKey pkey_;
  share::schema::ObMultiVersionSchemaService* schema_service_;
  ObIPartitionComponentFactory* cp_fty_;
  transaction::ObTransService* txs_;
  ObPGMemtableMgr* pg_memtable_mgr_;
  bool is_inited_;
  bool merge_successed_;
  int64_t merge_timestamp_;
  int64_t merge_failed_cnt_;
  ObPartitionStore store_;
  common::ObReplicaType replay_replica_type_;  // only for compatibility of 1.4, used in replaying slog
  ObPartitionPrefixAccessStat prefix_access_stat_;
};

inline int64_t ObPartitionStorage::get_lock_wait_timeout(
    const int64_t abs_lock_timeout, const int64_t stmt_timeout) const
{
  return (abs_lock_timeout < 0 ? stmt_timeout : (abs_lock_timeout > stmt_timeout ? stmt_timeout : abs_lock_timeout));
}

}  // namespace storage
}  // end namespace oceanbase

#endif  // OCEANBASE_STORAGE_OB_PARTITION_STORAGE