提交 f40dd602 编写于 作者: O obdev 提交者: wangzelin.wzl

fix prepare task failed causes drop column wait data complement.

上级 38a93056
......@@ -1848,7 +1848,6 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq
MTL_SWITCH(arg.tenant_id_) {
ObTenantDagScheduler *dag_scheduler = nullptr;
ObComplementDataDag *dag = nullptr;
ObComplementPrepareTask *prepare_task = nullptr;
if (OB_ISNULL(dag_scheduler = MTL(ObTenantDagScheduler *))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dag scheduler is null", K(ret));
......@@ -1859,15 +1858,8 @@ int ObService::build_ddl_single_replica_request(const ObDDLBuildSingleReplicaReq
LOG_WARN("unexpected error, dag is null", K(ret), KP(dag));
} else if (OB_FAIL(dag->init(arg))) {
LOG_WARN("fail to init complement data dag", K(ret), K(arg));
} else if (OB_FAIL(dag->alloc_task(prepare_task))) {
LOG_WARN("fail to alloc complement prepare task", K(ret));
} else if (OB_ISNULL(prepare_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, prepare task is null", K(ret), KP(prepare_task));
} else if (OB_FAIL(prepare_task->init(dag->get_param(), dag->get_context()))) {
LOG_WARN("fail to init complement prepare task", K(ret));
} else if (OB_FAIL(dag->add_task(*prepare_task))) {
LOG_WARN("fail to add complement prepare task to dag", K(ret));
} else if (OB_FAIL(dag->create_first_task())) {
LOG_WARN("create first task failed", K(ret));
} else if (OB_FAIL(dag_scheduler->add_dag(dag))) {
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
......
......@@ -20,6 +20,7 @@
#include "lib/lock/ob_thread_cond.h"
#include "lib/profile/ob_trace_id.h"
#include "lib/thread/thread_mgr_interface.h"
#include "share/location_cache/ob_location_struct.h"
#include "share/ob_errno.h"
#include "share/ob_thread_pool.h"
......@@ -58,7 +59,8 @@ public:
|| common::OB_SCHEMA_EAGAIN == ret_code || common::OB_GTS_NOT_READY == ret_code || common::OB_ERR_SHARED_LOCK_CONFLICT == ret_code
|| common::OB_PARTITION_NOT_EXIST == ret_code || common::OB_PG_IS_REMOVED == ret_code || common::OB_TENANT_NOT_EXIST == ret_code
|| common::OB_RPC_SEND_ERROR == ret_code || common::OB_RPC_CONNECT_ERROR == ret_code || common::OB_DDL_SCHEMA_VERSION_NOT_MATCH == ret_code
|| OB_TRANS_ROLLBACKED == ret_code || OB_TRANS_TIMEOUT == ret_code || OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret_code;
|| OB_TRANS_ROLLBACKED == ret_code || OB_TRANS_TIMEOUT == ret_code || OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret_code
|| is_location_service_renew_error(ret_code);
}
static bool in_ddl_retry_black_list(const int ret_code)
{
......
......@@ -362,6 +362,51 @@ int ObComplementDataDag::init(const ObDDLBuildSingleReplicaRequestArg &arg)
return ret;
}
int ObComplementDataDag::create_first_task()
{
int ret = OB_SUCCESS;
ObComplementPrepareTask *prepare_task = nullptr;
ObComplementWriteTask *write_task = nullptr;
ObComplementMergeTask *merge_task = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_FAIL(alloc_task(prepare_task))) {
LOG_WARN("allocate task failed", K(ret));
} else if (OB_ISNULL(prepare_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(prepare_task->init(param_, context_))) {
LOG_WARN("init prepare task failed", K(ret));
} else if (OB_FAIL(add_task(*prepare_task))) {
LOG_WARN("add task failed", K(ret));
} else if (OB_FAIL(alloc_task(write_task))) {
LOG_WARN("alloc task failed", K(ret));
} else if (OB_ISNULL(write_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(write_task->init(0, param_, context_))) {
LOG_WARN("init write task failed", K(ret));
} else if (OB_FAIL(prepare_task->add_child(*write_task))) {
LOG_WARN("add child task failed", K(ret));
} else if (OB_FAIL(add_task(*write_task))) {
LOG_WARN("add task failed", K(ret));
} else if (OB_FAIL(alloc_task(merge_task))) {
LOG_WARN("alloc task failed", K(ret));
} else if (OB_ISNULL(merge_task)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr task", K(ret));
} else if (OB_FAIL(merge_task->init(param_, context_))) {
LOG_WARN("init merge task failed", K(ret));
} else if (OB_FAIL(write_task->add_child(*merge_task))) {
LOG_WARN("add child task failed", K(ret));
} else if (OB_FAIL(add_task(*merge_task))) {
LOG_WARN("add task failed");
}
return ret;
}
int ObComplementDataDag::prepare_context()
{
int ret = OB_SUCCESS;
......@@ -419,8 +464,9 @@ int64_t ObComplementDataDag::hash() const
tmp_ret = OB_ERR_SYS;
LOG_ERROR("table schema must not be NULL", K(tmp_ret), K(is_inited_), K(param_));
} else {
hash_val = param_.ls_id_.hash() + param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() +
param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id() + ObDagType::DAG_TYPE_DDL;
hash_val = param_.tenant_id_ + param_.ls_id_.hash()
+ param_.data_table_schema_->get_table_id() + param_.hidden_table_schema_->get_table_id()
+ param_.source_tablet_id_.hash() + param_.dest_tablet_id_.hash() + ObDagType::DAG_TYPE_DDL;
}
return hash_val;
}
......@@ -437,11 +483,10 @@ bool ObComplementDataDag::operator==(const ObIDag &other) const
tmp_ret = OB_ERR_SYS;
LOG_ERROR("invalid argument", K(tmp_ret), K(param_), K(dag.param_));
} else {
is_equal = (param_.ls_id_ == dag.param_.ls_id_) && (param_.tenant_id_ == dag.param_.tenant_id_) &&
(param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_) &&
is_equal = (param_.tenant_id_ == dag.param_.tenant_id_) && (param_.ls_id_ == dag.param_.ls_id_) &&
(param_.data_table_schema_->get_table_id() == dag.param_.data_table_schema_->get_table_id()) &&
(param_.hidden_table_schema_->get_table_id() == dag.param_.hidden_table_schema_->get_table_id()) &&
(param_.compat_mode_ == dag.param_.compat_mode_);
(param_.source_tablet_id_ == dag.param_.source_tablet_id_) && (param_.dest_tablet_id_ == dag.param_.dest_tablet_id_);
}
}
return is_equal;
......@@ -562,10 +607,6 @@ int ObComplementPrepareTask::process()
LOG_WARN("prepare complement context failed", K(ret));
} else if (OB_FAIL(context_->write_start_log(*param_))) {
LOG_WARN("write start log failed", K(ret), KPC(param_));
} else if (OB_FAIL(generate_complement_write_task(dag, write_task))) {
LOG_WARN("fail to generate complement write task", K(ret));
} else if (OB_FAIL(generate_complement_merge_task(dag, write_task, merge_task))) {
LOG_WARN("fail to generate complement merge task", K(ret));
} else {
LOG_INFO("finish the complement prepare task", K(ret));
}
......@@ -577,51 +618,6 @@ int ObComplementPrepareTask::process()
return ret;
}
int ObComplementPrepareTask::generate_complement_write_task(ObComplementDataDag *dag,
ObComplementWriteTask *&write_task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
} else if (OB_ISNULL(dag)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(dag));
} else if (OB_FAIL(dag->alloc_task(write_task))) {
LOG_WARN("fail to alloc write task", K(ret));
} else if (OB_FAIL(write_task->init(0, *param_, *context_))) {
LOG_WARN("fail to init complement write task", K(ret));
} else if (OB_FAIL(add_child(*write_task))) {
LOG_WARN("fail to add child for complement prepare task", K(ret));
} else if (OB_FAIL(dag->add_task(*write_task))) {
LOG_WARN("fail to add complement write task to dag", K(ret));
}
return ret;
}
int ObComplementPrepareTask::generate_complement_merge_task(ObComplementDataDag *dag,
ObComplementWriteTask *write_task, ObComplementMergeTask *&merge_task)
{
int ret = OB_SUCCESS;
merge_task = NULL;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObComplementPrepareTask has not been inited", K(ret));
} else if (OB_ISNULL(dag) || OB_ISNULL(write_task)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(dag), KP(write_task));
} else if (OB_FAIL(dag->alloc_task(merge_task))) {
LOG_WARN("fail to alloc merge task", K(ret));
} else if (OB_FAIL(merge_task->init(*param_, *context_))) {
LOG_WARN("fail to init merge task", K(ret));
} else if (OB_FAIL(write_task->add_child(*merge_task))) {
LOG_WARN("fail to add child for write task", K(ret));
} else if (OB_FAIL(dag->add_task(*merge_task))) {
LOG_WARN("fail to add merge task", K(ret));
}
return ret;
}
ObComplementWriteTask::ObComplementWriteTask()
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), is_inited_(false), task_id_(0), param_(nullptr),
context_(nullptr), write_row_(),
......
......@@ -144,6 +144,7 @@ public:
int fill_dag_key(char *buf, const int64_t buf_len) const override;
virtual lib::Worker::CompatMode get_compat_mode() const override
{ return param_.compat_mode_; }
virtual int create_first_task() override;
// report replica build status to RS.
int report_replica_build_status();
private:
......@@ -160,9 +161,6 @@ public:
~ObComplementPrepareTask();
int init(ObComplementDataParam &param, ObComplementDataContext &context);
int process() override;
private:
int generate_complement_write_task(ObComplementDataDag *dag, ObComplementWriteTask *&write_task);
int generate_complement_merge_task(ObComplementDataDag *dag, ObComplementWriteTask *write_task, ObComplementMergeTask *&merge_task);
private:
bool is_inited_;
ObComplementDataParam *param_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册