diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 55d695a0e0381a552a061fb832e6b11d5293fdf7..357f54524516378fa3daf304de2929aa3cf7e990 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -105,6 +105,7 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam ¶m, start_arg.tenant_id_ = param.tenant_id_; start_arg.table_id_ = param.table_id_; start_arg.parallelism_ = param.session_count_; + start_arg.is_load_data_ = !param.px_mode_; if (OB_FAIL(GET_MIN_DATA_VERSION(param.tenant_id_, data_version))) { LOG_WARN("fail to get tenant data version", KR(ret)); } else if (OB_FAIL(ObTableLoadRedefTable::start(start_arg, start_res, *session_info_))) { diff --git a/src/observer/table_load/ob_table_load_redef_table.cpp b/src/observer/table_load/ob_table_load_redef_table.cpp index bae89714497aa268f82880f1aa23cb5e4e76e103..428a5b1d1636f58b885904cd3ba87956fb9a6be6 100644 --- a/src/observer/table_load/ob_table_load_redef_table.cpp +++ b/src/observer/table_load/ob_table_load_redef_table.cpp @@ -36,7 +36,7 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg, create_table_arg.table_id_ = arg.table_id_; create_table_arg.dest_tenant_id_ = arg.tenant_id_; create_table_arg.parallelism_ = arg.parallelism_; - create_table_arg.ddl_type_ = share::DDL_DIRECT_LOAD; + create_table_arg.ddl_type_ = arg.is_load_data_ ? share::DDL_DIRECT_LOAD : share::DDL_DIRECT_LOAD_INSERT; create_table_arg.session_id_ = session_info.get_sessid_for_table(); create_table_arg.sql_mode_ = session_info.get_sql_mode(); create_table_arg.tz_info_ = session_info.get_tz_info_wrap().get_tz_info_offset(); diff --git a/src/observer/table_load/ob_table_load_redef_table.h b/src/observer/table_load/ob_table_load_redef_table.h index 4c5390b50635675e43393aa4511330820888878c..c62dc43448e4c3f4bb9c4ff2e829c3f38335f6ce 100644 --- a/src/observer/table_load/ob_table_load_redef_table.h +++ b/src/observer/table_load/ob_table_load_redef_table.h @@ -20,7 +20,8 @@ struct ObTableLoadRedefTableStartArg { public: ObTableLoadRedefTableStartArg() - : tenant_id_(common::OB_INVALID_ID), table_id_(common::OB_INVALID_ID), parallelism_(0) + : tenant_id_(common::OB_INVALID_ID), table_id_(common::OB_INVALID_ID), parallelism_(0), + is_load_data_(false) { } ~ObTableLoadRedefTableStartArg() = default; @@ -29,17 +30,19 @@ public: tenant_id_ = common::OB_INVALID_ID; table_id_ = common::OB_INVALID_ID; parallelism_ = 0; + is_load_data_ = false; } bool is_valid() const { return common::OB_INVALID_ID != tenant_id_ && common::OB_INVALID_ID != table_id_ && 0 != parallelism_; } - TO_STRING_KV(K_(tenant_id), K_(table_id), K_(parallelism)); + TO_STRING_KV(K_(tenant_id), K_(table_id), K_(parallelism), K_(is_load_data)); public: uint64_t tenant_id_; uint64_t table_id_; uint64_t parallelism_; + bool is_load_data_; }; struct ObTableLoadRedefTableStartRes diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index 01e5ebf944daeb3cbc877f3f27bd1544f8acefc9..9f76f06761f2627e289df22d18a5ea268d8e7657 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -1419,7 +1419,9 @@ int ObDDLRedefinitionTask::sync_stats_info() if (OB_ISNULL(root_service)) { ret = OB_ERR_SYS; LOG_WARN("error sys, root service must not be nullptr", K(ret)); - } else if (has_synced_stats_info_) { + } else if (has_synced_stats_info_ || task_type_ == DDL_DIRECT_LOAD) { + // bugfix: https://work.aone.alibaba-inc.com/issue/48313634 + // shouldn't sync stats if the ddl task is from load data's direct_load } else { ObMultiVersionSchemaService &schema_service = root_service->get_schema_service(); ObMySQLTransaction trans; diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index e121032fe25338728ee5ca1c802ead5938c00f21..e60e59eb98130b481af99aabbee7a34711990f81 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -689,6 +689,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, case DDL_CONVERT_TO_CHARACTER: case DDL_TABLE_REDEFINITION: case DDL_DIRECT_LOAD: + case DDL_DIRECT_LOAD_INSERT: if (OB_FAIL(create_table_redefinition_task(proxy, param.type_, param.src_table_schema_, @@ -1647,6 +1648,7 @@ int ObDDLScheduler::schedule_ddl_task(const ObDDLTaskRecord &record) case DDL_CONVERT_TO_CHARACTER: case DDL_TABLE_REDEFINITION: case DDL_DIRECT_LOAD: + case DDL_DIRECT_LOAD_INSERT: ret = schedule_table_redefinition_task(record); break; case DDL_DROP_COLUMN: @@ -1757,7 +1759,8 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task if (OB_ENTRY_EXIST != ret) { LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task)); } - } else if (ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_ + } else if ((ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_ + || ObDDLType::DDL_DIRECT_LOAD_INSERT == task_record.ddl_type_) && OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(task_record.task_id_))) { LOG_WARN("register_task_time recover fail", K(ret)); } @@ -2062,6 +2065,7 @@ int ObDDLScheduler::on_sstable_complement_job_reply( case ObDDLType::DDL_CONVERT_TO_CHARACTER: case ObDDLType::DDL_TABLE_REDEFINITION: case ObDDLType::DDL_DIRECT_LOAD: + case ObDDLType::DDL_DIRECT_LOAD_INSERT: if (OB_FAIL(static_cast(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { LOG_WARN("update complete sstable job status", K(ret)); } @@ -2136,6 +2140,7 @@ int ObDDLScheduler::notify_update_autoinc_end(const ObDDLTaskKey &task_key, case ObDDLType::DDL_ALTER_PARTITION_BY: case ObDDLType::DDL_TABLE_REDEFINITION: case ObDDLType::DDL_DIRECT_LOAD: + case ObDDLType::DDL_DIRECT_LOAD_INSERT: if (OB_FAIL(static_cast(&task)->notify_update_autoinc_finish(autoinc_val, ret_code))) { LOG_WARN("update complete sstable job status", K(ret)); } diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index e80654481f8ad1492f6cdca5f099f9a119911df5..9197d1382d22c1b754afc1dc682fd63298da637b 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -663,6 +663,9 @@ int ObDDLTask::get_ddl_type_str(const int64_t ddl_type, const char *&ddl_type_st case DDL_DIRECT_LOAD: ddl_type_str = "direct load"; break; + case DDL_DIRECT_LOAD_INSERT: + ddl_type_str = "direct load insert"; + break; case DDL_MODIFY_AUTO_INCREMENT: ddl_type_str = "modify auto increment"; break; @@ -2652,7 +2655,8 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl( case ObDDLType::DDL_ADD_COLUMN_OFFLINE: case ObDDLType::DDL_COLUMN_REDEFINITION: case ObDDLType::DDL_TABLE_REDEFINITION: - case ObDDLType::DDL_DIRECT_LOAD: { + case ObDDLType::DDL_DIRECT_LOAD: + case ObDDLType::DDL_DIRECT_LOAD_INSERT: { has_conflict_ddl = true; break; } diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 523121739713d36feec2509298c1a9c9bb07cc8f..1ee1135ca3c1ebc77bc1bac2dccd279d86f8c938 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -146,7 +146,8 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob check_table_empty_job_ret_code_ = ret_code; } else { switch(task_type_) { - case ObDDLType::DDL_DIRECT_LOAD: { + case ObDDLType::DDL_DIRECT_LOAD: + case ObDDLType::DDL_DIRECT_LOAD_INSERT: { complete_sstable_job_ret_code_ = ret_code; LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_)); break; @@ -174,7 +175,8 @@ int ObTableRedefinitionTask::send_build_replica_request() { int ret = OB_SUCCESS; switch (task_type_) { - case DDL_DIRECT_LOAD: { + case DDL_DIRECT_LOAD: + case DDL_DIRECT_LOAD_INSERT: { // do nothing break; } @@ -369,7 +371,8 @@ int ObTableRedefinitionTask::replica_end_check(const int ret_code) { int ret = OB_SUCCESS; switch(task_type_) { - case DDL_DIRECT_LOAD : { + case DDL_DIRECT_LOAD : + case DDL_DIRECT_LOAD_INSERT : { break; } default : { @@ -799,6 +802,7 @@ int ObTableRedefinitionTask::repending(const share::ObDDLTaskStatus next_task_st } else { switch (task_type_) { case DDL_DIRECT_LOAD: + case DDL_DIRECT_LOAD_INSERT: if (get_is_do_finish()) { if (OB_FAIL(switch_status(next_task_status, true, ret))) { LOG_WARN("fail to switch status", K(ret)); @@ -1182,7 +1186,9 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value) } // append direct load information to the message - if (OB_SUCC(ret) && (ObDDLType::DDL_DIRECT_LOAD == get_task_type())) { + if (OB_SUCC(ret) + && (ObDDLType::DDL_DIRECT_LOAD == get_task_type() + || ObDDLType::DDL_DIRECT_LOAD_INSERT == get_task_type())) { common::ObArenaAllocator allocator(lib::ObLabel("RedefTask")); sql::ObLoadDataStat job_stat; if (OB_FAIL(get_direct_load_job_stat(allocator, job_stat))) { diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index f7838cb1513843bb47b78866f2cb48c653c405a4..d7c2f5a981390ad85e4cef29147b1729a97a1938 100644 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -9128,6 +9128,8 @@ const char* ObDDLService::ddl_type_str(const ObDDLType ddl_type) str = "table redefinition"; } else if (DDL_DIRECT_LOAD == ddl_type) { str = "direct load"; + } else if (DDL_DIRECT_LOAD_INSERT == ddl_type) { + str = "direct load insert"; } else if (DDL_MODIFY_AUTO_INCREMENT == ddl_type) { str = "modify auto_increment"; } else if (DDL_CONVERT_TO_CHARACTER == ddl_type) { diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index b5d9419e156e6e7f5d02a9fffbff31ea6673ef09..9ecc9f176218bb99c09d896c708503d846f84fcb 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -77,7 +77,8 @@ enum ObDDLType DDL_ADD_COLUMN_OFFLINE = 1008, // only add columns DDL_COLUMN_REDEFINITION = 1009, // only add/drop columns DDL_TABLE_REDEFINITION = 1010, - DDL_DIRECT_LOAD = 1011, + DDL_DIRECT_LOAD = 1011, // load data + DDL_DIRECT_LOAD_INSERT = 1012, // insert into select // @note new normal ddl type to be defined here !!!