/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #ifndef OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H #define OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H #include "storage/access/ob_table_access_context.h" #include "share/scheduler/ob_dag_scheduler.h" #include "storage/blocksstable/ob_block_sstable_struct.h" #include "storage/blocksstable/ob_index_block_builder.h" #include "storage/compaction/ob_column_checksum_calculator.h" #include "storage/ddl/ob_ddl_redo_log_writer.h" #include "storage/ob_store_row_comparer.h" #include "sql/engine/expr/ob_expr_frame_info.h" namespace oceanbase { namespace sql { struct ObTempExpr; } namespace storage { class ObLocalScan; class ObMultipleScanMerge; struct ObComplementDataParam final { public: static const int64_t DEFAULT_COMPLEMENT_DATA_MEMORY_LIMIT = 128L * 1024L * 1024L; ObComplementDataParam(): is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID), ls_id_(share::ObLSID::INVALID_LS_ID), source_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID), data_table_schema_(nullptr), hidden_table_schema_(nullptr), allocator_("ComplementData"), row_store_type_(common::ENCODING_ROW_STORE), schema_version_(0), snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID) {} ~ObComplementDataParam() { destroy(); } int init(const ObDDLBuildSingleReplicaRequestArg &arg); int split_task_ranges(const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, const int64_t hint_parallelism); int deep_copy_table_schemas(const share::schema::ObTableSchema *data_table_schema, const share::schema::ObTableSchema *hidden_table_schema); bool is_valid() const { return common::OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && source_tablet_id_.is_valid() && dest_tablet_id_.is_valid() && OB_NOT_NULL(data_table_schema_) && OB_NOT_NULL(hidden_table_schema_) && 0 != concurrent_cnt_ && snapshot_version_ > 0 && compat_mode_ != lib::Worker::CompatMode::INVALID && execution_id_ > 0; } int get_hidden_table_key(ObITable::TableKey &table_key) const; void destroy() { is_inited_ = false; tenant_id_ = common::OB_INVALID_TENANT_ID; ls_id_.reset(); source_tablet_id_.reset(); dest_tablet_id_.reset(); if (nullptr != data_table_schema_) { data_table_schema_->~ObTableSchema(); } if (nullptr != hidden_table_schema_) { hidden_table_schema_->~ObTableSchema(); } data_table_schema_ = nullptr; hidden_table_schema_ = nullptr; allocator_.reset(); row_store_type_ = common::ENCODING_ROW_STORE; schema_version_ = 0; snapshot_version_ = 0; concurrent_cnt_ = 0; task_id_ = 0; execution_id_ = 0; compat_mode_ = lib::Worker::CompatMode::INVALID; } TO_STRING_KV(K_(is_inited), K_(tenant_id), K_(ls_id), K_(source_tablet_id), K_(dest_tablet_id), KPC_(data_table_schema), KPC_(hidden_table_schema), K_(schema_version), K_(snapshot_version), K_(concurrent_cnt), K_(task_id), K_(execution_id), K_(compat_mode)); public: bool is_inited_; uint64_t tenant_id_; share::ObLSID ls_id_; ObTabletID source_tablet_id_; ObTabletID dest_tablet_id_; const share::schema::ObTableSchema *data_table_schema_; const share::schema::ObTableSchema *hidden_table_schema_; common::ObArenaAllocator allocator_; common::ObRowStoreType row_store_type_; int64_t schema_version_; int64_t snapshot_version_; int64_t concurrent_cnt_; int64_t task_id_; int64_t execution_id_; lib::Worker::CompatMode compat_mode_; ObSEArray ranges_; }; struct ObComplementDataContext final { public: ObComplementDataContext(): is_inited_(false), complement_data_ret_(common::OB_SUCCESS), allocator_("ComplementData"), lock_(), concurrent_cnt_(0), data_sstable_redo_writer_(), index_builder_(nullptr) {} ~ObComplementDataContext() { destroy(); } int init(const ObComplementDataParam ¶m, const ObDataStoreDesc &desc); void destroy(); int write_start_log(const ObComplementDataParam ¶m); TO_STRING_KV(K_(is_inited), K_(complement_data_ret), K_(concurrent_cnt), KP_(index_builder)); public: bool is_inited_; int complement_data_ret_; common::ObArenaAllocator allocator_; ObSpinLock lock_; int64_t concurrent_cnt_; ObDDLSSTableRedoWriter data_sstable_redo_writer_; blocksstable::ObSSTableIndexBuilder *index_builder_; ObDDLKvMgrHandle ddl_kv_mgr_handle_; // for keeping ddl kv mgr alive }; class ObComplementPrepareTask; class ObComplementWriteTask; class ObComplementMergeTask; class ObComplementDataDag final: public share::ObIDag { public: ObComplementDataDag(); ~ObComplementDataDag(); int init(const ObDDLBuildSingleReplicaRequestArg &arg); int prepare_context(); int64_t hash() const; bool operator ==(const share::ObIDag &other) const; bool is_inited() const { return is_inited_; } ObComplementDataParam &get_param() { return param_; } ObComplementDataContext &get_context() { return context_; } void handle_init_failed_ret_code(int ret) { context_.complement_data_ret_ = ret; } int fill_comment(char *buf, const int64_t buf_len) const override; int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return param_.compat_mode_; } virtual int create_first_task() override; // report replica build status to RS. int report_replica_build_status(); private: bool is_inited_; ObComplementDataParam param_; ObComplementDataContext context_; DISALLOW_COPY_AND_ASSIGN(ObComplementDataDag); }; class ObComplementPrepareTask final : public share::ObITask { public: ObComplementPrepareTask(); ~ObComplementPrepareTask(); int init(ObComplementDataParam ¶m, ObComplementDataContext &context); int process() override; private: bool is_inited_; ObComplementDataParam *param_; ObComplementDataContext *context_; DISALLOW_COPY_AND_ASSIGN(ObComplementPrepareTask); }; class ObComplementWriteTask final : public share::ObITask { public: ObComplementWriteTask(); ~ObComplementWriteTask(); int init(const int64_t task_id, ObComplementDataParam ¶m, ObComplementDataContext &context); int process() override; private: int generate_next_task(share::ObITask *&next_task); int generate_col_param(const share::schema::ObTableSchema *data_table_schema, const share::schema::ObTableSchema *hidden_table_schema); int local_scan_by_range(); int do_local_scan(); int append_row(ObLocalScan &local_scan); int add_extra_rowkey(const int64_t extra_rowkey_cnt, const blocksstable::ObDatumRow &row); private: static const int64_t RETRY_INTERVAL = 100 * 1000; // 100ms bool is_inited_; int64_t task_id_; ObComplementDataParam *param_; ObComplementDataContext *context_; blocksstable::ObDatumRow write_row_; ObArray col_ids_; ObArray org_col_ids_; ObArray output_projector_; DISALLOW_COPY_AND_ASSIGN(ObComplementWriteTask); }; class ObComplementMergeTask final : public share::ObITask { public: ObComplementMergeTask(); ~ObComplementMergeTask(); int init(ObComplementDataParam ¶m, ObComplementDataContext &context); int process() override; private: int add_build_hidden_table_sstable(); private: bool is_inited_; ObComplementDataParam *param_; ObComplementDataContext *context_; DISALLOW_COPY_AND_ASSIGN(ObComplementMergeTask); }; struct ObExtendedGCParam final { public: ObExtendedGCParam(): col_ids_(), org_col_ids_(), extended_col_ids_(), org_extended_col_ids_(), dependent_exprs_(), output_projector_() {} ~ObExtendedGCParam() {} common::ObArray col_ids_; common::ObArray org_col_ids_; common::ObArray extended_col_ids_; common::ObArray org_extended_col_ids_; ObArray dependent_exprs_; common::ObArray output_projector_; TO_STRING_KV(K_(col_ids), K_(org_col_ids), K_(extended_col_ids), K_(org_extended_col_ids), K_(dependent_exprs), K_(output_projector)); }; class ObLocalScan : public ObIStoreRowIterator { public: ObLocalScan(); virtual ~ObLocalScan(); int init(const common::ObIArray &col_ids, const common::ObIArray &org_col_ids, const common::ObIArray &projector, const share::schema::ObTableSchema *data_table_schema, const int64_t snapshot_version, transaction::ObTransService *txs, const share::schema::ObTableSchema *hidden_table_schema, const bool output_org_cols_only); int table_scan(const share::ObLSID &ls_id, const ObTabletID &tablet_id, ObTabletTableIterator &table_iter, common::ObQueryFlag &query_flag, blocksstable::ObDatumRange &range, transaction::ObTxDesc *tx_desc); virtual int get_next_row(const blocksstable::ObDatumRow *&tmp_row) override; int get_origin_table_checksum(ObArray &report_col_checksums, ObArray &report_col_ids); compaction::ObColumnChecksumCalculator *get_checksum_calculator() {return &checksum_calculator_;} private: int get_output_columns(common::ObIArray &col_ids); int get_exist_column_mapping(); // to record data table columns position in hidden tables. int check_generated_column_exist(const common::ObIArray &org_col_ids); int construct_column_schema(); int construct_access_param(const ObTabletID &tablet_id, const ObTableReadInfo &full_read_info); int construct_range_ctx(common::ObQueryFlag &query_flag, const share::ObLSID &ls_id, transaction::ObTxDesc *tx_desc); int construct_multiple_scan_merge(ObTablet &tablet, blocksstable::ObDatumRange &range); int construct_multiple_scan_merge( ObTabletTableIterator &table_iter, blocksstable::ObDatumRange &range); private: bool is_inited_; ObExtendedGCParam extended_gc_; const share::schema::ObTableSchema *data_table_schema_; const share::schema::ObTableSchema *hidden_table_schema_; int64_t snapshot_version_; transaction::ObTransService *txs_; blocksstable::ObDatumRow default_row_; blocksstable::ObDatumRow tmp_row_; ObIStoreRowIterator *row_iter_; ObMultipleScanMerge *scan_merge_; ObStoreCtx ctx_; ObTableAccessParam access_param_; ObTableAccessContext access_ctx_; ObGetTableParam get_table_param_; common::ObArenaAllocator allocator_; common::ObArenaAllocator calc_buf_; ObExprCtx expr_ctx_; common::ObArray col_params_; ObTableReadInfo read_info_; common::ObBitmap exist_column_mapping_; compaction::ObColumnChecksumCalculator checksum_calculator_; bool output_org_cols_only_; }; } // end namespace storage } // end namespace oceanbase #endif // OCEANBASE_STORAGE_OB_COMPLEMENT_DATA_TASK_H