From f40dd6025e5d7232aaf4f1d41549ab2db8a70a5c Mon Sep 17 00:00:00 2001 From: obdev Date: Sun, 6 Nov 2022 02:15:17 +0000 Subject: [PATCH] fix prepare task failed causes drop column wait data complement. --- src/observer/ob_service.cpp | 12 +-- src/share/ob_ddl_task_executor.h | 4 +- src/storage/ddl/ob_complement_data_task.cpp | 104 ++++++++++---------- src/storage/ddl/ob_complement_data_task.h | 4 +- 4 files changed, 56 insertions(+), 68 deletions(-) diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 51eaa79b1..47cb5f375 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -1848,7 +1848,6 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq MTL_SWITCH(arg.tenant_id_) { ObTenantDagScheduler *dag_scheduler = nullptr; ObComplementDataDag *dag = nullptr; - ObComplementPrepareTask *prepare_task = nullptr; if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dag scheduler is null", K(ret)); @@ -1859,15 +1858,8 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq LOG_WARN("unexpected error, dag is null", K(ret), KP(dag)); } else if (OB_FAIL(dag->init(arg))) { LOG_WARN("fail to init complement data dag", K(ret), K(arg)); - } else if (OB_FAIL(dag->alloc_task(prepare_task))) { - LOG_WARN("fail to alloc complement prepare task", K(ret)); - } else if (OB_ISNULL(prepare_task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error, prepare task is null", K(ret), KP(prepare_task)); - } else if (OB_FAIL(prepare_task->init(dag->get_param(), dag->get_context()))) { - LOG_WARN("fail to init complement prepare task", K(ret)); - } else if (OB_FAIL(dag->add_task(*prepare_task))) { - LOG_WARN("fail to add complement prepare task to dag", K(ret)); + } else if (OB_FAIL(dag->create_first_task())) { + LOG_WARN("create first task failed", K(ret)); } else if (OB_FAIL(dag_scheduler->add_dag(dag))) { if (OB_EAGAIN == ret) { ret = OB_SUCCESS; diff --git a/src/share/ob_ddl_task_executor.h b/src/share/ob_ddl_task_executor.h index 5b491b0fb..2d1807aa2 100644 --- a/src/share/ob_ddl_task_executor.h +++ b/src/share/ob_ddl_task_executor.h @@ -20,6 +20,7 @@ #include "lib/lock/ob_thread_cond.h" #include "lib/profile/ob_trace_id.h" #include "lib/thread/thread_mgr_interface.h" +#include "share/location_cache/ob_location_struct.h" #include "share/ob_errno.h" #include "share/ob_thread_pool.h" @@ -58,7 +59,8 @@ public: || common::OB_SCHEMA_EAGAIN == ret_code || common::OB_GTS_NOT_READY == ret_code || common::OB_ERR_SHARED_LOCK_CONFLICT == ret_code || common::OB_PARTITION_NOT_EXIST == ret_code || common::OB_PG_IS_REMOVED == ret_code || common::OB_TENANT_NOT_EXIST == ret_code || common::OB_RPC_SEND_ERROR == ret_code || common::OB_RPC_CONNECT_ERROR == ret_code || common::OB_DDL_SCHEMA_VERSION_NOT_MATCH == ret_code - || OB_TRANS_ROLLBACKED == ret_code || OB_TRANS_TIMEOUT == ret_code || OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret_code; + || OB_TRANS_ROLLBACKED == ret_code || OB_TRANS_TIMEOUT == ret_code || OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret_code + || is_location_service_renew_error(ret_code); } static bool in_ddl_retry_black_list(const int ret_code) { diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index b9a756b65..9e30a3e74 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -362,6 +362,51 @@ int ObComplementDataDag::init(const ObDDLBuildSingleReplicaRequestArg &arg) return ret; } +int ObComplementDataDag::create_first_task() +{ + int ret = OB_SUCCESS; + ObComplementPrepareTask *prepare_task = nullptr; + ObComplementWriteTask *write_task = nullptr; + ObComplementMergeTask *merge_task = nullptr; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_FAIL(alloc_task(prepare_task))) { + LOG_WARN("allocate task failed", K(ret)); + } else if (OB_ISNULL(prepare_task)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr task", K(ret)); + } else if (OB_FAIL(prepare_task->init(param_, context_))) { + LOG_WARN("init prepare task failed", K(ret)); + } else if (OB_FAIL(add_task(*prepare_task))) { + LOG_WARN("add task failed", K(ret)); + } else if (OB_FAIL(alloc_task(write_task))) { + LOG_WARN("alloc task failed", K(ret)); + } else if (OB_ISNULL(write_task)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr task", K(ret)); + } else if (OB_FAIL(write_task->init(0, param_, context_))) { + LOG_WARN("init write task failed", K(ret)); + } else if (OB_FAIL(prepare_task->add_child(*write_task))) { + LOG_WARN("add child task failed", K(ret)); + } else if (OB_FAIL(add_task(*write_task))) { + LOG_WARN("add task failed", K(ret)); + } else if (OB_FAIL(alloc_task(merge_task))) { + LOG_WARN("alloc task failed", K(ret)); + } else if (OB_ISNULL(merge_task)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected nullptr task", K(ret)); + } else if (OB_FAIL(merge_task->init(param_, context_))) { + LOG_WARN("init merge task failed", K(ret)); + } else if (OB_FAIL(write_task->add_child(*merge_task))) { + LOG_WARN("add child task failed", K(ret)); + } else if (OB_FAIL(add_task(*merge_task))) { + LOG_WARN("add task failed"); + } + + return ret; +} + int ObComplementDataDag::prepare_context() { int ret = OB_SUCCESS; @@ -419,8 +464,9 @@ int64_t ObComplementDataDag::hash() const tmp_ret = OB_ERR_SYS; LOG_ERROR("table schema must not be NULL", K(tmp_ret), K(is_inited_), K(param_)); } else { - hash_val = param_.ls_id_.hash() + param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() + - param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id() + ObDagType::DAG_TYPE_DDL; + hash_val = param_.tenant_id_ + param_.ls_id_.hash() + + param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id() + + param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() + ObDagType::DAG_TYPE_DDL; } return hash_val; } @@ -437,11 +483,10 @@ bool ObComplementDataDag::operator==(const ObIDag &other) const tmp_ret = OB_ERR_SYS; LOG_ERROR("invalid argument", K(tmp_ret), K(param_), K(dag.param_)); } else { - is_equal = (param_.ls_id_ == dag.param_.ls_id_) && (param_.tenant_id_ == dag.param_.tenant_id_) && - (param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_) && + is_equal = (param_.tenant_id_ == dag.param_.tenant_id_) && (param_.ls_id_ == dag.param_.ls_id_) && (param_.data_table_schema_->get_table_id() == dag.param_.data_table_schema_->get_table_id()) && (param_.hidden_table_schema_->get_table_id() == dag.param_.hidden_table_schema_->get_table_id()) && - (param_.compat_mode_ == dag.param_.compat_mode_); + (param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_); } } return is_equal; @@ -562,10 +607,6 @@ int ObComplementPrepareTask::process() LOG_WARN("prepare complement context failed", K(ret)); } else if (OB_FAIL(context_->write_start_log(*param_))) { LOG_WARN("write start log failed", K(ret), KPC(param_)); - } else if (OB_FAIL(generate_complement_write_task(dag, write_task))) { - LOG_WARN("fail to generate complement write task", K(ret)); - } else if (OB_FAIL(generate_complement_merge_task(dag, write_task, merge_task))) { - LOG_WARN("fail to generate complement merge task", K(ret)); } else { LOG_INFO("finish the complement prepare task", K(ret)); } @@ -577,51 +618,6 @@ int ObComplementPrepareTask::process() return ret; } -int ObComplementPrepareTask::generate_complement_write_task(ObComplementDataDag *dag, - ObComplementWriteTask *&write_task) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("ObComplementPrepareTask has not been inited", K(ret)); - } else if (OB_ISNULL(dag)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(dag)); - } else if (OB_FAIL(dag->alloc_task(write_task))) { - LOG_WARN("fail to alloc write task", K(ret)); - } else if (OB_FAIL(write_task->init(0, *param_, *context_))) { - LOG_WARN("fail to init complement write task", K(ret)); - } else if (OB_FAIL(add_child(*write_task))) { - LOG_WARN("fail to add child for complement prepare task", K(ret)); - } else if (OB_FAIL(dag->add_task(*write_task))) { - LOG_WARN("fail to add complement write task to dag", K(ret)); - } - return ret; -} - -int ObComplementPrepareTask::generate_complement_merge_task(ObComplementDataDag *dag, - ObComplementWriteTask *write_task, ObComplementMergeTask *&merge_task) -{ - int ret = OB_SUCCESS; - merge_task = NULL; - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("ObComplementPrepareTask has not been inited", K(ret)); - } else if (OB_ISNULL(dag) || OB_ISNULL(write_task)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), KP(dag), KP(write_task)); - } else if (OB_FAIL(dag->alloc_task(merge_task))) { - LOG_WARN("fail to alloc merge task", K(ret)); - } else if (OB_FAIL(merge_task->init(*param_, *context_))) { - LOG_WARN("fail to init merge task", K(ret)); - } else if (OB_FAIL(write_task->add_child(*merge_task))) { - LOG_WARN("fail to add child for write task", K(ret)); - } else if (OB_FAIL(dag->add_task(*merge_task))) { - LOG_WARN("fail to add merge task", K(ret)); - } - return ret; -} - ObComplementWriteTask::ObComplementWriteTask() : ObITask(TASK_TYPE_COMPLEMENT_WRITE), is_inited_(false), task_id_(0), param_(nullptr), context_(nullptr), write_row_(), diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index 85bda2902..51872f6f1 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -144,6 +144,7 @@ public: int fill_dag_key(char *buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return param_.compat_mode_; } + virtual int create_first_task() override; // report replica build status to RS. int report_replica_build_status(); private: @@ -160,9 +161,6 @@ public: ~ObComplementPrepareTask(); int init(ObComplementDataParam ¶m, ObComplementDataContext &context); int process() override; -private: - int generate_complement_write_task(ObComplementDataDag *dag, ObComplementWriteTask *&write_task); - int generate_complement_merge_task(ObComplementDataDag *dag, ObComplementWriteTask *write_task, ObComplementMergeTask *&merge_task); private: bool is_inited_; ObComplementDataParam *param_; -- GitLab