/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/dml/ob_table_replace_op.h" #include "share/ob_autoincrement_service.h" #include "sql/engine/ob_physical_plan_ctx.h" #include "sql/engine/ob_physical_plan.h" #include "sql/engine/ob_exec_context.h" #include "lib/utility/ob_tracepoint.h" #include "sql/engine/dml/ob_dml_service.h" #include "sql/engine/expr/ob_expr_calc_partition_id.h" #include "sql/das/ob_das_insert_op.h" #include "sql/das/ob_data_access_service.h" #include "sql/engine/dml/ob_trigger_handler.h" namespace oceanbase { using namespace common; using namespace share; using namespace storage; namespace sql { OB_SERIALIZE_MEMBER((ObTableReplaceOpInput, ObTableModifyOpInput)); OB_DEF_SERIALIZE(ObTableReplaceSpec) { int ret = OB_SUCCESS; int64_t index_cnt = replace_ctdefs_.count(); BASE_SER((ObTableReplaceSpec, ObTableModifySpec)); OB_UNIS_ENCODE(index_cnt); for (int64_t i = 0; OB_SUCC(ret) && i < index_cnt; ++i) { ObReplaceCtDef *replace_ctdef = replace_ctdefs_.at(i); if (OB_ISNULL(replace_ctdef)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("replace_ctdef is nullptr", K(ret)); } OB_UNIS_ENCODE(*replace_ctdef); } OB_UNIS_ENCODE(only_one_unique_key_); OB_UNIS_ENCODE(conflict_checker_ctdef_); return ret; } OB_DEF_DESERIALIZE(ObTableReplaceSpec) { int ret = OB_SUCCESS; int64_t index_cnt = 0; BASE_DESER((ObTableReplaceSpec, ObTableModifySpec)); OB_UNIS_DECODE(index_cnt); OZ(replace_ctdefs_.allocate_array(alloc_, index_cnt)); ObDMLCtDefAllocator replace_ctdef_allocator(alloc_); for (int64_t i = 0; OB_SUCC(ret) && i < index_cnt; ++i) { ObReplaceCtDef *replace_ctdef = replace_ctdef_allocator.alloc(); if (OB_ISNULL(replace_ctdef)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc insert_up_ctdef failed", K(ret)); } OB_UNIS_DECODE(*replace_ctdef); replace_ctdefs_.at(i) = replace_ctdef; } OB_UNIS_DECODE(only_one_unique_key_); OB_UNIS_DECODE(conflict_checker_ctdef_); return ret; } OB_DEF_SERIALIZE_SIZE(ObTableReplaceSpec) { int64_t len = 0; int64_t index_cnt = replace_ctdefs_.count(); BASE_ADD_LEN((ObTableReplaceSpec, ObTableModifySpec)); OB_UNIS_ADD_LEN(index_cnt); for (int64_t i = 0; i < index_cnt; ++i) { ObReplaceCtDef *replace_ctdef = replace_ctdefs_.at(i); if (replace_ctdef != nullptr) { OB_UNIS_ADD_LEN(*replace_ctdef); } else { LOG_WARN("replace_ctdef is null unexpected"); } } OB_UNIS_ADD_LEN(only_one_unique_key_); OB_UNIS_ADD_LEN(conflict_checker_ctdef_); return len; } int ObTableReplaceOp::inner_open() { int ret = OB_SUCCESS; ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx_); NG_TRACE(replace_open); if (OB_FAIL(check_replace_ctdefs_valid())) { LOG_WARN("replace ctdef is invalid", K(ret)); } else if (OB_FAIL(ObTableModifyOp::inner_open())) { LOG_WARN("inner open ObTableModifyOp failed", K(ret)); } else if (OB_UNLIKELY(MY_SPEC.replace_ctdefs_.empty())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ins ctdef is invalid", K(ret), KP(this)); } else if (OB_UNLIKELY(iter_end_)) { //do nothing } else if (OB_FAIL(replace_row_store_.init(UINT64_MAX, my_session->get_effective_tenant_id(), ObCtxIds::DEFAULT_CTX_ID, "replace_row_store", false/*enable_dump*/))) { LOG_WARN("fail to init replace row store", K(ret)); } else if (OB_FAIL(inner_open_with_das())) { LOG_WARN("inner open with das failed", K(ret)); } else { conflict_checker_.set_local_tablet_loc(MY_INPUT.get_tablet_loc()); } return ret; } OB_INLINE int ObTableReplaceOp::inner_open_with_das() { int ret = OB_SUCCESS; const ObExprFrameInfo *expr_frame_info = NULL; ObDASTableLoc *table_loc = nullptr; expr_frame_info = nullptr != MY_SPEC.expr_frame_info_ ? MY_SPEC.expr_frame_info_ : &MY_SPEC.plan_->get_expr_frame_info(); if (OB_FAIL(init_replace_rtdef())) { LOG_WARN("init replace rtdef failed", K(ret), K(MY_SPEC.replace_ctdefs_.count())); } else if (OB_ISNULL(table_loc = replace_rtdefs_.at(0).ins_rtdef_.das_rtdef_.table_loc_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table location is invalid", K(ret)); } else if (OB_FAIL(conflict_checker_.init_conflict_checker(expr_frame_info, table_loc))) { LOG_WARN("init conflict_checker fail", K(ret)); } return ret; } OB_INLINE int ObTableReplaceOp::init_replace_rtdef() { int ret = OB_SUCCESS; if (OB_FAIL(replace_rtdefs_.allocate_array(ctx_.get_allocator(), MY_SPEC.replace_ctdefs_.count()))) { LOG_WARN("allocate insert rtdef failed", K(ret), K(MY_SPEC.replace_ctdefs_.count())); } trigger_clear_exprs_.reset(); for (int64_t i = 0; OB_SUCC(ret) && i < replace_rtdefs_.count(); ++i) { ObReplaceCtDef *replace_ctdef = MY_SPEC.replace_ctdefs_.at(i); const ObInsCtDef *ins_ctdef = replace_ctdef->ins_ctdef_; const ObDelCtDef *del_ctdef = replace_ctdef->del_ctdef_; ObInsRtDef &ins_rtdef = replace_rtdefs_.at(i).ins_rtdef_; ObDelRtDef &del_rtdef = replace_rtdefs_.at(i).del_rtdef_; OZ(ObDMLService::init_ins_rtdef(dml_rtctx_, ins_rtdef, *ins_ctdef, trigger_clear_exprs_)); OZ(ObDMLService::init_del_rtdef(dml_rtctx_, del_rtdef, *del_ctdef)); if (OB_SUCC(ret)) { ins_rtdef.das_rtdef_.table_loc_->is_writing_ = true; } } return ret; } int ObTableReplaceOp::inner_rescan() { int ret = OB_SUCCESS; if (OB_FAIL(ObTableModifyOp::inner_rescan())) { LOG_WARN("rescan child operator failed", K(ret)); } else { conflict_checker_.set_local_tablet_loc(MY_INPUT.get_tablet_loc()); replace_rtdefs_.release_array(); if (OB_UNLIKELY(iter_end_)) { //do nothing } else if (OB_FAIL(init_replace_rtdef())) { LOG_WARN("init replace rtdef failed", K(ret)); } else if (OB_FAIL(reuse())) { LOG_WARN("reuse op fail", K(ret)); } } return ret; } int ObTableReplaceOp::inner_close() { int ret = OB_SUCCESS; NG_TRACE(replace_inner_close); if (OB_FAIL(conflict_checker_.close())) { LOG_WARN("fail to close conflict_checker", K(ret)); } // close dml das tasks int close_ret = ObTableModifyOp::inner_close(); return (OB_SUCCESS == ret) ? close_ret : ret; } int ObTableReplaceOp::inner_get_next_row() { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); if (iter_end_) { LOG_DEBUG("can't get gi task, iter end", K(MY_SPEC.id_), K(iter_end_)); ret = OB_ITER_END; } else { if (OB_FAIL(try_check_status())) { LOG_WARN("check status failed", K(ret)); } else if (OB_FAIL(do_replace_into())) { LOG_WARN("fail to do replace into", K(ret)); } else if (OB_FAIL(plan_ctx->sync_last_value_local())) { LOG_WARN("failed to sync value globally", K(ret)); } else { plan_ctx->set_row_matched_count(insert_rows_); plan_ctx->set_affected_rows(insert_rows_ + delete_rows_); plan_ctx->set_row_duplicated_count(delete_rows_); } int sync_ret = OB_SUCCESS; if (OB_SUCCESS != (sync_ret = plan_ctx->sync_last_value_global())) { // sync last user specified value after iter ends(compatible with MySQL) LOG_WARN("failed to sync last value", K(sync_ret)); } if (OB_SUCC(ret)) { ret = OB_SUCCESS == sync_ret ? OB_ITER_END : sync_ret; } } return ret; } OB_INLINE int ObTableReplaceOp::get_next_row_from_child() { int ret = OB_SUCCESS; clear_evaluated_flag(); if (OB_FAIL(child_->get_next_row())) { if (OB_ITER_END != ret) { LOG_WARN("fail to get next row", K(ret)); } } else { insert_rows_++; LOG_TRACE("child output row", "output row", ROWEXPR2STR(eval_ctx_, child_->get_spec().output_)); } return ret; } OB_INLINE int ObTableReplaceOp::load_all_replace_row(bool &is_iter_end) { int ret = OB_SUCCESS; is_iter_end = false; ObInsCtDef *primary_ins_ctdef = NULL; int64_t row_cnt = 0; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_); int64_t simulate_batch_row_cnt = - EVENT_CALL(EventTable::EN_TABLE_REPLACE_BATCH_ROW_COUNT); int64_t default_row_batch_cnt = simulate_batch_row_cnt > 0 ? simulate_batch_row_cnt : DEFAULT_REPLACE_BATCH_ROW_COUNT; LOG_DEBUG("simulate lookup row batch count", K(simulate_batch_row_cnt), K(default_row_batch_cnt)); while (OB_SUCC(ret) && ++row_cnt < default_row_batch_cnt) { // todo @kaizhan.dkz @wangbo.wb 增加行前trigger逻辑在这里 // 新行的外键检查也在这里做 if (OB_FAIL(get_next_row_from_child())) { if (OB_ITER_END != ret) { LOG_WARN("fail to load next row from child", K(ret)); } } else if (OB_FAIL(insert_row_to_das(true))) { LOG_WARN("insert row to das", K(ret)); } else if (OB_FAIL(replace_row_store_.add_row(get_primary_table_new_row(), &eval_ctx_))) { LOG_WARN("add replace row to row store failed", K(ret)); } else { plan_ctx->record_last_insert_id_cur_stmt(); } } if (OB_ITER_END == ret) { ret = OB_SUCCESS; is_iter_end = true; } return ret; } int ObTableReplaceOp::insert_row_to_das(bool need_do_trigger) { int ret = OB_SUCCESS; bool is_skipped = false; // 尝试写入数据到所有的表 for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.replace_ctdefs_.count(); ++i) { // insert each table with fetched row const ObReplaceCtDef &replace_ctdef = *(MY_SPEC.replace_ctdefs_.at(i)); const ObInsCtDef &ins_ctdef = *(replace_ctdef.ins_ctdef_); ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(i); ObInsRtDef &ins_rtdef = replace_rtdef.ins_rtdef_; ObDASTabletLoc *tablet_loc = nullptr; ++ins_rtdef.cur_row_num_; if (need_do_trigger && OB_FAIL(ObDMLService::init_heap_table_pk_for_ins(ins_ctdef, eval_ctx_))) { LOG_WARN("fail to init heap table pk to null", K(ret)); } else if (need_do_trigger && OB_FAIL(ObDMLService::process_insert_row( ins_ctdef, ins_rtdef, *this, is_skipped))) { LOG_WARN("process insert row failed", K(ret)); } else if (OB_UNLIKELY(is_skipped)) { break; } else if (OB_FAIL(calc_insert_tablet_loc(ins_ctdef, ins_rtdef, tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); } else if (need_do_trigger && OB_FAIL(ObDMLService::set_heap_table_hidden_pk(ins_ctdef, tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc), K(ins_ctdef)); } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table } else if (need_do_trigger && ins_ctdef.is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, ins_ctdef.trig_ctdef_, ins_rtdef.trig_rtdef_, ObTriggerEvents::get_insert_event()))) { LOG_WARN("failed to handle before trigger", K(ret)); } else { LOG_DEBUG("insert one row", KPC(tablet_loc), "ins row", ROWEXPR2STR(eval_ctx_, ins_ctdef.new_row_)); } } return ret; } int ObTableReplaceOp::delete_row_to_das(bool need_do_trigger) { int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.replace_ctdefs_.count(); ++i) { bool is_skipped = false; ObDASTabletLoc *tablet_loc = nullptr; const ObReplaceCtDef &replace_ctdef = *(MY_SPEC.replace_ctdefs_.at(i)); const ObDelCtDef &del_ctdef = *(replace_ctdef.del_ctdef_); ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(i); ObDelRtDef &del_rtdef = replace_rtdef.del_rtdef_; if (need_do_trigger && OB_FAIL(ObDMLService::process_delete_row(del_ctdef, del_rtdef, is_skipped, *this))) { LOG_WARN("process delete row failed", K(ret)); } else if (OB_UNLIKELY(is_skipped)) { //this row has been skipped, so can not write to DAS buffer(include its global index) //so need to break this loop break; } else if (OB_FAIL(calc_delete_tablet_loc(del_ctdef, del_rtdef, tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); } else if (OB_FAIL(ObDMLService::delete_row(del_ctdef, del_rtdef, tablet_loc, dml_rtctx_))) { LOG_WARN("insert row with das failed", K(ret)); } else { LOG_DEBUG("delete one row", KPC(tablet_loc), "del row", ROWEXPR2STR(eval_ctx_, del_ctdef.old_row_)); } } return ret; } int ObTableReplaceOp::fetch_conflict_rowkey() { int ret = OB_SUCCESS; bool got_row = false; NG_TRACE_TIMES(2, replace_start_lookup); DASTaskIter task_iter = dml_rtctx_.das_ref_.begin_task_iter(); while (OB_SUCC(ret) && !task_iter.is_end()) { // 不需要clear rowkey表达式的eval_flag,因为主键使用的是column_ref表达式,不存在eval_fun if (OB_FAIL(get_next_conflict_rowkey(task_iter))) { if (OB_ITER_END != ret) { LOG_WARN("fail to get next conflict rowkey from das_result", K(ret)); } } else if (OB_FAIL(conflict_checker_.build_primary_table_lookup_das_task())) { LOG_WARN("fail to build lookup_das_task", K(ret)); } } ret = (ret == OB_ITER_END ? OB_SUCCESS : ret); return ret; } int ObTableReplaceOp::get_next_conflict_rowkey(DASTaskIter &task_iter) { int ret = OB_SUCCESS; bool got_row = false; while (OB_SUCC(ret) && !got_row) { ObNewRow *dup_row = nullptr; ObChunkDatumStore::StoredRow *stored_row = nullptr; ObDASWriteBuffer::DmlShadowRow ssr; ObDASInsertOp *ins_op = static_cast(*task_iter); ObNewRowIterator *conflict_result = ins_op->get_duplicated_result(); const ObDASInsCtDef *ins_ctdef = static_cast(ins_op->get_ctdef()); // 因为返回的都是主表的主键,主表的主键一定是在存储层有储存的,是不需要再收起来层再做运算的, // 所以这里不需要clear eval flag // clear_datum_eval_flag(); if (OB_ISNULL(conflict_result)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("duplicted key result is null", K(ret)); } else if (OB_FAIL(conflict_result->get_next_row(dup_row))) { if (OB_ITER_END == ret) { ++task_iter; if (!task_iter.is_end()) { ret = OB_SUCCESS; } } else { LOG_WARN("get next row from das result failed", K(ret)); } } else if (OB_FAIL(ssr.init(dml_rtctx_.get_das_alloc(), ins_ctdef->table_rowkey_types_, false))) { LOG_WARN("init shadow stored row failed", K(ret), K(ins_ctdef->table_rowkey_types_)); } else if (OB_FAIL(ssr.shadow_copy(*dup_row))) { LOG_WARN("shadow copy ob new row failed", K(ret)); } else if (OB_ISNULL(stored_row = ssr.get_store_row())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("stored row is null", K(ret)); } else if (OB_FAIL(stored_row->to_expr_skip_const( conflict_checker_.checker_ctdef_.data_table_rowkey_expr_, conflict_checker_.eval_ctx_))) { if (OB_ITER_END != ret) { LOG_WARN("get next row from result iterator failed", K(ret)); } } else { got_row = true; } } return ret; } int ObTableReplaceOp::post_all_dml_das_task() { int ret = OB_SUCCESS; NG_TRACE_TIMES(2, replace_try_insert); if (dml_rtctx_.das_ref_.has_task()) { if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { LOG_WARN("execute all delete das task failed", K(ret)); } } return ret; } int ObTableReplaceOp::do_replace_into() { int ret = OB_SUCCESS; bool is_iter_end = false; while (OB_SUCC(ret) && !is_iter_end) { int64_t savepoint_no = 0; // must set conflict_row fetch flag add_need_conflict_result_flag(); NG_TRACE_TIMES(2, replace_load_all_row); if (OB_FAIL(ObSqlTransControl::create_anonymous_savepoint(ctx_, savepoint_no))) { LOG_WARN("fail to create save_point", K(ret)); } else if (OB_FAIL(load_all_replace_row(is_iter_end))) { LOG_WARN("fail to load all row", K(ret)); } else if (OB_FAIL(post_all_dml_das_task())) { LOG_WARN("fail to post all das task", K(ret)); } else if (!check_is_duplicated()) { LOG_DEBUG("try insert is not duplicated", K(ret)); } else if (OB_FAIL(fetch_conflict_rowkey())) { LOG_WARN("fail to fetch conflict row", K(ret)); } else if (OB_FAIL(reset_das_env())) { // 这里需要reuse das 相关信息 LOG_WARN("fail to reset das env", K(ret)); } else if (OB_FAIL(ObSqlTransControl::rollback_savepoint(ctx_, savepoint_no))) { // 本次插入存在冲突, 回滚到save_point LOG_WARN("fail to rollback to save_point", K(ret)); } else if (OB_FAIL(conflict_checker_.do_lookup_and_build_base_map( replace_row_store_.get_row_cnt()))) { LOG_WARN("fail to do table lookup", K(ret)); } else if (OB_FAIL(replace_conflict_row_cache())) { LOG_WARN("fail to shuff all replace row", K(ret)); } else if (OB_FAIL(prepare_final_replace_task())) { LOG_WARN("fail to prepare final das task", K(ret)); } else if (OB_FAIL(post_all_dml_das_task())) { LOG_WARN("do insert rows post process failed", K(ret)); } if (OB_SUCC(ret) && !is_iter_end) { // 只有还有下一个batch时才需要做reuse,如果没有下一个batch,close和destroy中会释放内存 // 前边逻辑执行成功,这一批batch成功完成replace, reuse环境, 准备下一个batch if (OB_FAIL(reuse())) { LOG_WARN("reuse failed", K(ret)); } } } return ret; } bool ObTableReplaceOp::check_is_duplicated() { int bret = false; for (int64_t i = 0; i < replace_rtdefs_.count(); ++i) { ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(i); ObDASInsRtDef &ins_rtdef = replace_rtdef.ins_rtdef_.das_rtdef_; if (ins_rtdef.is_duplicated_) { bret = true; } } return bret; } int ObTableReplaceOp::replace_conflict_row_cache() { int ret = OB_SUCCESS; ObSEArray constraint_values; ObChunkDatumStore::Iterator replace_row_iter; bool skip_delete = false; // useless, only meet the function entry requirements bool is_skipped = false; const ObReplaceCtDef &replace_ctdef = *(MY_SPEC.replace_ctdefs_.at(0)); const ObInsCtDef &ins_ctdef = *(replace_ctdef.ins_ctdef_); const ObDelCtDef &del_ctdef = *(replace_ctdef.del_ctdef_); ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(0); ObInsRtDef &ins_rtdef = replace_rtdef.ins_rtdef_; ObDelRtDef &del_rtdef = replace_rtdef.del_rtdef_; NG_TRACE_TIMES(2, replace_start_shuff); OZ(replace_row_store_.begin(replace_row_iter)); const ObChunkDatumStore::StoredRow *replace_row = NULL; // 构建冲突的hash map的时候也使用的是column_ref, 回表也是使用的column_ref expr来读取scan的结果 // 因为constarain_info中使用的column_ref expr,所以此处需要使用table_column_old_exprs (column_ref exprs) while (OB_SUCC(ret) && OB_SUCC(replace_row_iter.get_next_row(get_primary_table_new_row(), eval_ctx_, &replace_row))) { constraint_values.reuse(); ObChunkDatumStore::StoredRow *insert_new_row = NULL; OZ(conflict_checker_.check_duplicate_rowkey(replace_row, constraint_values, false)); for (int64_t i = 0; OB_SUCC(ret) && i < constraint_values.count(); ++i) { //delete duplicated row const ObChunkDatumStore::StoredRow *delete_row = constraint_values.at(i).current_datum_row_; bool same_row = false; CK(OB_NOT_NULL(delete_row)); // dup checker依赖table column exprs OZ(delete_row->to_expr_skip_const(get_primary_table_old_row(), eval_ctx_)); // 外键的检查已经下方到dml_service逻辑中了 OZ(ObDMLService::process_delete_row(del_ctdef, del_rtdef, skip_delete, *this)); OZ(conflict_checker_.delete_old_row(delete_row, ObNewRowSource::FROM_INSERT)); if (OB_SUCC(ret) && OB_LIKELY(MY_SPEC.only_one_unique_key_)) { OZ(check_values(same_row, replace_row, delete_row)); } if (OB_SUCC(ret) && !same_row) { delete_rows_++; } } OZ(replace_row->to_expr_skip_const(get_primary_table_new_row(), eval_ctx_)); OZ(ObDMLService::process_insert_row(ins_ctdef, ins_rtdef, *this, is_skipped)); // TODO(yikang): fix trigger related for heap table if (OB_SUCC(ret) && ins_ctdef.is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, ins_ctdef.trig_ctdef_, ins_rtdef.trig_rtdef_, ObTriggerEvents::get_insert_event()))) { LOG_WARN("failed to handle before trigger", K(ret)); } if (OB_SUCC(ret) && OB_UNLIKELY(is_skipped)) { continue; } OZ(conflict_checker_.convert_exprs_to_stored_row(get_primary_table_new_row(), insert_new_row)); // add new row to conflict_checker map // 在insert_new_row 函数内部,会把replace_row to_expr到table_column_exprs中 OZ(conflict_checker_.insert_new_row(insert_new_row, ObNewRowSource::FROM_INSERT)); } // while row store end ret = OB_ITER_END == ret ? OB_SUCCESS : ret; return ret; } int ObTableReplaceOp::prepare_final_replace_task() { int ret = OB_SUCCESS; ObConflictRowMap *primary_map = NULL; NG_TRACE_TIMES(2, replace_final_shuff); OZ(conflict_checker_.get_primary_table_map(primary_map)); CK(OB_NOT_NULL(primary_map)); OZ(do_delete(primary_map)); OZ(do_insert(primary_map)); return ret; } int ObTableReplaceOp::do_delete(ObConflictRowMap *primary_map) { int ret = OB_SUCCESS; ObConflictRowMap::iterator start_row_iter = primary_map->begin(); ObConflictRowMap::iterator end_row_iter = primary_map->end(); for (; OB_SUCC(ret) && start_row_iter != end_row_iter; ++start_row_iter) { clear_datum_eval_flag(); ObConflictValue &constraint_value = start_row_iter->second; LOG_DEBUG("get one constraint_value from primary hash map", K(constraint_value)); if (NULL != constraint_value.baseline_datum_row_) { //baseline row is not empty, delete it if (OB_FAIL(constraint_value.baseline_datum_row_->to_expr_skip_const( get_primary_table_old_row(), eval_ctx_))) { LOG_WARN("stored row to expr faild", K(ret)); } else if (OB_FAIL(delete_row_to_das(false))) { LOG_WARN("shuffle delete row failed", K(ret), K(constraint_value)); } else { LOG_TRACE("delete one row from primary hash map", "real delete row", ROWEXPR2STR(eval_ctx_, get_primary_table_old_row())); } } } return ret; } int ObTableReplaceOp::do_insert(ObConflictRowMap *primary_map) { int ret = OB_SUCCESS; ObConflictRowMap::iterator start_row_iter = primary_map->begin(); ObConflictRowMap::iterator end_row_iter = primary_map->end(); for (; OB_SUCC(ret) && start_row_iter != end_row_iter; ++start_row_iter) { clear_datum_eval_flag(); ObConflictValue &constraint_value = start_row_iter->second; if (OB_SUCC(ret) && NULL != constraint_value.current_datum_row_) { //current row is not empty, insert new row if (OB_FAIL(constraint_value.current_datum_row_->to_expr_skip_const( get_primary_table_new_row(), eval_ctx_))) { LOG_WARN("stored row to expr faild", K(ret)); } else if (OB_FAIL(insert_row_to_das(false))) { LOG_WARN("shuffle insert row failed", K(ret), K(constraint_value)); } else { LOG_TRACE("insert one row from primary hash map", "real insert row", ROWEXPR2STR(eval_ctx_, get_primary_table_new_row())); } } } return ret; } int ObTableReplaceOp::calc_insert_tablet_loc(const ObInsCtDef &ins_ctdef, ObInsRtDef &ins_rtdef, ObDASTabletLoc *&tablet_loc) { int ret = OB_SUCCESS; if (MY_SPEC.use_dist_das_) { if (ins_ctdef.multi_ctdef_ != nullptr) { ObExpr *calc_part_id_expr = ins_ctdef.multi_ctdef_->calc_part_id_expr_; ObObjectID partition_id = OB_INVALID_ID; ObTabletID tablet_id; ObDASTableLoc &table_loc = *ins_rtdef.das_rtdef_.table_loc_; if (OB_FAIL(ObExprCalcPartitionBase::calc_part_and_tablet_id(calc_part_id_expr, eval_ctx_, partition_id, tablet_id))) { LOG_WARN("calc part and tablet id by expr failed", K(ret)); } else if (!ins_ctdef.multi_ctdef_->hint_part_ids_.empty() && !has_exist_in_array(ins_ctdef.multi_ctdef_->hint_part_ids_, partition_id)) { ret = OB_PARTITION_NOT_MATCH; LOG_DEBUG("Partition not match", K(ret), K(partition_id), K(ins_ctdef.multi_ctdef_->hint_part_ids_)); } else if (OB_FAIL(DAS_CTX(ctx_).extended_tablet_loc(table_loc, tablet_id, tablet_loc))) { LOG_WARN("extended tablet loc failed", K(ret)); } } } else { //direct write insert row to storage tablet_loc = MY_INPUT.get_tablet_loc(); } return ret; } int ObTableReplaceOp::calc_delete_tablet_loc(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, ObDASTabletLoc *&tablet_loc) { int ret = OB_SUCCESS; if (MY_SPEC.use_dist_das_) { if (del_ctdef.multi_ctdef_ != nullptr) { ObExpr *calc_part_id_expr = del_ctdef.multi_ctdef_->calc_part_id_expr_; ObObjectID partition_id = OB_INVALID_ID; ObTabletID tablet_id; ObDASTableLoc &table_loc = *del_rtdef.das_rtdef_.table_loc_; if (OB_FAIL(ObExprCalcPartitionBase::calc_part_and_tablet_id(calc_part_id_expr, eval_ctx_, partition_id, tablet_id))) { LOG_WARN("calc part and tablet id by expr failed", K(ret)); } else if (OB_FAIL(DAS_CTX(ctx_).extended_tablet_loc(table_loc, tablet_id, tablet_loc))) { LOG_WARN("extended tablet loc failed", K(ret)); } } } else { //direct write delete row to storage tablet_loc = MY_INPUT.get_tablet_loc(); } return ret; } int ObTableReplaceOp::check_replace_ctdefs_valid() const { int ret = OB_SUCCESS; CK(MY_SPEC.replace_ctdefs_.count() > 0); for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.replace_ctdefs_.count(); ++i) { const ObReplaceCtDef *replace_ctdef = NULL; const ObInsCtDef *insert_ctdef = NULL; const ObDelCtDef *del_ctdef = NULL; CK(OB_NOT_NULL(replace_ctdef = MY_SPEC.replace_ctdefs_.at(i))); CK(OB_NOT_NULL(insert_ctdef = replace_ctdef->ins_ctdef_)); CK(OB_NOT_NULL(del_ctdef = replace_ctdef->del_ctdef_)); } return ret; } const ObIArray &ObTableReplaceOp::get_primary_table_new_row() { return MY_SPEC.replace_ctdefs_.at(0)->ins_ctdef_->new_row_; } const ObIArray &ObTableReplaceOp::get_primary_table_old_row() { return MY_SPEC.replace_ctdefs_.at(0)->del_ctdef_->old_row_; } int ObTableReplaceOp::reset_das_env() { int ret = OB_SUCCESS; // 释放第一次try insert的das task if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all das task failed", K(ret)); } else { dml_rtctx_.das_ref_.reuse(); } // 因为第二次插入不需要fetch conflict result了,如果有conflict // 就说明replace into的某些逻辑处理有问题 for (int64_t i = 0; OB_SUCC(ret) && i < replace_rtdefs_.count(); ++i) { ObInsRtDef &ins_rtdef = replace_rtdefs_.at(i).ins_rtdef_; ins_rtdef.das_rtdef_.need_fetch_conflict_ = false; ins_rtdef.das_rtdef_.is_duplicated_ = false; } return ret; } int ObTableReplaceOp::reuse() { int ret = OB_SUCCESS; if (dml_rtctx_.das_ref_.has_task()) { if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all insert das task failed", K(ret)); } else { dml_rtctx_.das_ref_.reuse(); } } if (OB_SUCC(ret)) { if (OB_FAIL(conflict_checker_.reuse())) { LOG_WARN("fail to reuse conflict checker", K(ret)); } else { replace_row_store_.reset(); } } return ret; } void ObTableReplaceOp::add_need_conflict_result_flag() { for (int64_t i = 0; i < replace_rtdefs_.count(); ++i) { ObInsRtDef &ins_rtdef = replace_rtdefs_.at(i).ins_rtdef_; ins_rtdef.das_rtdef_.need_fetch_conflict_ = true; } dml_rtctx_.set_pick_del_task_first(); dml_rtctx_.set_non_sub_full_task(); } int ObTableReplaceOp::check_values(bool &is_equal, const ObChunkDatumStore::StoredRow *replace_row, const ObChunkDatumStore::StoredRow *delete_row) { int ret = OB_SUCCESS; is_equal = true; const ObIArray &new_row = get_primary_table_new_row(); const ObIArray &old_row = get_primary_table_old_row(); OZ(check_replace_ctdefs_valid()); CK(OB_NOT_NULL(delete_row)); CK(OB_NOT_NULL(replace_row)); CK(replace_row->cnt_ == new_row.count()); for (int64_t i = 0; OB_SUCC(ret) && i < new_row.count(); ++i) { const UIntFixedArray &column_ids = MY_SPEC.replace_ctdefs_.at(0)->ins_ctdef_->column_ids_; CK(new_row.at(i)->basic_funcs_->null_first_cmp_ == old_row.at(i)->basic_funcs_->null_first_cmp_); if (OB_SUCC(ret)) { if (share::schema::ObColumnSchemaV2::is_hidden_pk_column_id(column_ids[i])) { //隐藏主键列不处理 } else { const ObDatum &insert_datum = replace_row->cells()[i]; const ObDatum &del_datum = delete_row->cells()[i]; if (0 != new_row.at(i)->basic_funcs_->null_first_cmp_(insert_datum, del_datum)) { is_equal = false; } } } } return ret; } } //namespace sql } //namespace oceanbase