ob_complement_data_task.h 11.3 KB
Newer Older
W
wangzelin.wzl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H
#define OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H

#include "storage/access/ob_table_access_context.h"
#include "share/scheduler/ob_dag_scheduler.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
#include "storage/blocksstable/ob_index_block_builder.h"
#include "storage/compaction/ob_column_checksum_calculator.h"
#include "storage/ddl/ob_ddl_redo_log_writer.h"
#include "storage/ob_store_row_comparer.h"
#include "sql/engine/expr/ob_expr_frame_info.h"

namespace oceanbase
{
namespace sql
{
struct ObTempExpr;
}
namespace storage
{
class ObLocalScan;
class ObMultipleScanMerge;
struct ObComplementDataParam final
{
public:
  static const int64_t DEFAULT_COMPLEMENT_DATA_MEMORY_LIMIT = 128L * 1024L * 1024L;
  ObComplementDataParam():
    is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID), ls_id_(share::ObLSID::INVALID_LS_ID), 
    source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID), 
    data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("ComplementData"), 
    row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0),
    concurrent_cnt_(0), task_id_(0), execution_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID)
  {}
  ~ObComplementDataParam() { destroy(); }
  int init(const ObDDLBuildSingleReplicaRequestArg &arg);
  int split_task_ranges(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, const int64_t hint_parallelism);
  int deep_copy_table_schemas(const share::schema::ObTableSchema *data_table_schema, const share::schema::ObTableSchema *hidden_table_schema);
  bool is_valid() const
  {
    return common::OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid()
           && dest_tablet_id_.is_valid() && OB_NOT_NULL(data_table_schema_) && OB_NOT_NULL(hidden_table_schema_)
           && 0 != concurrent_cnt_ && snapshot_version_ > 0 && compat_mode_ != lib::Worker::CompatMode::INVALID
           && execution_id_ > 0;
  }
  int get_hidden_table_key(ObITable::TableKey &table_key) const;
  void destroy()
  {
    is_inited_ = false;
    tenant_id_ = common::OB_INVALID_TENANT_ID;
    ls_id_.reset();
    source_tablet_id_.reset();
    dest_tablet_id_.reset();
    if (nullptr != data_table_schema_) {
      data_table_schema_->~ObTableSchema();
    }
    if (nullptr != hidden_table_schema_) {
      hidden_table_schema_->~ObTableSchema();
    }
    data_table_schema_ = nullptr;
    hidden_table_schema_ = nullptr;
    allocator_.reset();
    row_store_type_ = common::ENCODING_ROW_STORE;
    schema_version_ = 0;
    snapshot_version_ = 0;
    concurrent_cnt_ = 0;
    task_id_ = 0;
    execution_id_ = 0;
    compat_mode_ = lib::Worker::CompatMode::INVALID;
  }
  TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id),
      KPC_(data_table_schema), KPC_(hidden_table_schema), K_(schema_version),
      K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode));
public:
  bool is_inited_;
  uint64_t tenant_id_;
  share::ObLSID ls_id_;
  ObTabletID source_tablet_id_;
  ObTabletID dest_tablet_id_;
  const share::schema::ObTableSchema *data_table_schema_;
  const share::schema::ObTableSchema *hidden_table_schema_;
  common::ObArenaAllocator allocator_;
  common::ObRowStoreType row_store_type_;
  int64_t schema_version_;
  int64_t snapshot_version_;
  int64_t concurrent_cnt_;
  int64_t task_id_;
  int64_t execution_id_;
  lib::Worker::CompatMode compat_mode_;
  ObSEArray<common::ObStoreRange, 32> ranges_;
};

struct ObComplementDataContext final
{
public:
  ObComplementDataContext():
    is_inited_(false), complement_data_ret_(common::OB_SUCCESS),
    allocator_("ComplementData"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr)
  {}
  ~ObComplementDataContext() { destroy(); }
  int init(const ObComplementDataParam &param, const ObDataStoreDesc &desc);
  void destroy();
  int write_start_log(const ObComplementDataParam &param);
  TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder));
public:
  bool is_inited_;
  int complement_data_ret_;
  common::ObArenaAllocator allocator_;
  ObSpinLock lock_;
  int64_t concurrent_cnt_;
  ObDDLSSTableRedoWriter data_sstable_redo_writer_;
  blocksstable::ObSSTableIndexBuilder *index_builder_;
124
  ObDDLKvMgrHandle ddl_kv_mgr_handle_; // for keeping ddl kv mgr alive
W
wangzelin.wzl 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
};

class ObComplementPrepareTask;
class ObComplementWriteTask;
class ObComplementMergeTask;
class ObComplementDataDag final: public share::ObIDag
{
public:
  ObComplementDataDag();
  ~ObComplementDataDag();
  int init(const ObDDLBuildSingleReplicaRequestArg &arg);
  int prepare_context();
  int64_t hash() const;
  bool operator ==(const share::ObIDag &other) const;
  bool is_inited() const { return is_inited_; }
  ObComplementDataParam &get_param() { return param_; }
  ObComplementDataContext &get_context() { return context_; }
  void handle_init_failed_ret_code(int ret) { context_.complement_data_ret_ = ret; }
  int fill_comment(char *buf, const int64_t buf_len) const override;
  int fill_dag_key(char *buf, const int64_t buf_len) const override;
  virtual lib::Worker::CompatMode get_compat_mode() const override
  { return param_.compat_mode_; }
147
  virtual int create_first_task() override;
W
wangzelin.wzl 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
  // report replica build status to RS.
  int report_replica_build_status();
private:
  bool is_inited_;
  ObComplementDataParam param_;
  ObComplementDataContext context_;
  DISALLOW_COPY_AND_ASSIGN(ObComplementDataDag);
};

class ObComplementPrepareTask final : public share::ObITask
{
public:
  ObComplementPrepareTask();
  ~ObComplementPrepareTask();
  int init(ObComplementDataParam &param, ObComplementDataContext &context);
  int process() override;
private:
  bool is_inited_;
  ObComplementDataParam *param_;
  ObComplementDataContext *context_;
  DISALLOW_COPY_AND_ASSIGN(ObComplementPrepareTask);
};

class ObComplementWriteTask final : public share::ObITask
{
public:
  ObComplementWriteTask();
  ~ObComplementWriteTask();
  int init(const int64_t task_id, ObComplementDataParam &param, ObComplementDataContext &context);
  int process() override;
private:
  int generate_next_task(share::ObITask *&next_task);
  int generate_col_param(const share::schema::ObTableSchema *data_table_schema,
                         const share::schema::ObTableSchema *hidden_table_schema);
  int local_scan_by_range();
  int do_local_scan();
  int append_row(ObLocalScan &local_scan);
  int add_extra_rowkey(const int64_t extra_rowkey_cnt, const blocksstable::ObDatumRow &row);

private:
  static const int64_t RETRY_INTERVAL = 100 * 1000; // 100ms
  bool is_inited_;
  int64_t task_id_;
  ObComplementDataParam *param_;
  ObComplementDataContext *context_;
  blocksstable::ObDatumRow write_row_;
  ObArray<ObColDesc> col_ids_;
  ObArray<ObColDesc> org_col_ids_;
  ObArray<int32_t> output_projector_;
  DISALLOW_COPY_AND_ASSIGN(ObComplementWriteTask);
};

class ObComplementMergeTask final : public share::ObITask
{
public:
  ObComplementMergeTask();
  ~ObComplementMergeTask();
  int init(ObComplementDataParam &param, ObComplementDataContext &context);
  int process() override;
private:
  int add_build_hidden_table_sstable();
private:
  bool is_inited_;
  ObComplementDataParam *param_;
  ObComplementDataContext *context_;
  DISALLOW_COPY_AND_ASSIGN(ObComplementMergeTask);
};

struct ObExtendedGCParam final
{
public:
  ObExtendedGCParam():
    col_ids_(), org_col_ids_(), extended_col_ids_(), org_extended_col_ids_(), dependent_exprs_(), output_projector_()
  {}
  ~ObExtendedGCParam() {}
  common::ObArray<share::schema::ObColDesc> col_ids_;
  common::ObArray<share::schema::ObColDesc> org_col_ids_;
  common::ObArray<share::schema::ObColDesc> extended_col_ids_;
  common::ObArray<share::schema::ObColDesc> org_extended_col_ids_;
  ObArray<sql::ObTempExpr *> dependent_exprs_;
  common::ObArray<int32_t> output_projector_;
  TO_STRING_KV(K_(col_ids), K_(org_col_ids), K_(extended_col_ids), K_(org_extended_col_ids),
      K_(dependent_exprs), K_(output_projector));
};

class ObLocalScan : public ObIStoreRowIterator
{
public:
  ObLocalScan();
  virtual ~ObLocalScan();
  int init(const common::ObIArray<share::schema::ObColDesc> &col_ids,
           const common::ObIArray<share::schema::ObColDesc> &org_col_ids,
           const common::ObIArray<int32_t> &projector,
            const share::schema::ObTableSchema *data_table_schema,
           const int64_t snapshot_version,
           transaction::ObTransService *txs,
           const share::schema::ObTableSchema *hidden_table_schema,
           const bool output_org_cols_only);
  int table_scan(const share::ObLSID &ls_id,
                 const ObTabletID &tablet_id,
                 ObTabletTableIterator &table_iter,
                 common::ObQueryFlag &query_flag,
                 blocksstable::ObDatumRange &range,
                 transaction::ObTxDesc *tx_desc);
  virtual int get_next_row(const blocksstable::ObDatumRow *&tmp_row) override;
  int get_origin_table_checksum(ObArray<int64_t> &report_col_checksums, ObArray<int64_t> &report_col_ids);
  compaction::ObColumnChecksumCalculator *get_checksum_calculator() {return &checksum_calculator_;}
private:
  int get_output_columns(common::ObIArray<ObColDesc> &col_ids);
  int get_exist_column_mapping(); // to record data table columns position in hidden tables.
  int check_generated_column_exist(const common::ObIArray<share::schema::ObColDesc> &org_col_ids);
  int construct_column_schema();
  int construct_access_param(const ObTabletID &tablet_id, const ObTableReadInfo &full_read_info);
  int construct_range_ctx(common::ObQueryFlag &query_flag, const share::ObLSID &ls_id, transaction::ObTxDesc *tx_desc);
  int construct_multiple_scan_merge(ObTablet &tablet, blocksstable::ObDatumRange &range);
  int construct_multiple_scan_merge(
      ObTabletTableIterator &table_iter,
      blocksstable::ObDatumRange &range);
private:
  bool is_inited_;
  ObExtendedGCParam extended_gc_;
  const share::schema::ObTableSchema *data_table_schema_;
  const share::schema::ObTableSchema *hidden_table_schema_;
  int64_t snapshot_version_;
  transaction::ObTransService *txs_;
  blocksstable::ObDatumRow default_row_;
  blocksstable::ObDatumRow tmp_row_;
  ObIStoreRowIterator *row_iter_;
  ObMultipleScanMerge *scan_merge_;
  ObStoreCtx ctx_;
  ObTableAccessParam access_param_;
  ObTableAccessContext access_ctx_;
  ObGetTableParam get_table_param_;
  common::ObArenaAllocator allocator_;
  common::ObArenaAllocator calc_buf_;
  ObExprCtx expr_ctx_;
  common::ObArray<share::schema::ObColumnParam *> col_params_;
  ObTableReadInfo read_info_;
  common::ObBitmap exist_column_mapping_;
  compaction::ObColumnChecksumCalculator checksum_calculator_;
  bool output_org_cols_only_;
};

}  // end namespace storage
}  // end namespace oceanbase
#endif  // OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H