提交 3b4a2fc3 编写于 作者: O obdev 提交者: ob-robot

fix finish_redef_table() bug

上级 a1e27ff6
......@@ -133,13 +133,8 @@ int ObTableLoadRedefTable::finish()
build_single_replica_response_arg.snapshot_version_ = 1;
build_single_replica_response_arg.schema_version_ = schema_version_;
build_single_replica_response_arg.execution_id_ = 1;
if (OB_FAIL(ObDDLServerClient::finish_redef_table(finish_redef_table_arg))) {
if (OB_FAIL(ObDDLServerClient::finish_redef_table(finish_redef_table_arg, build_single_replica_response_arg, *session_info_))) {
LOG_WARN("failed to finish redef table", KR(ret), K(finish_redef_table_arg));
} else if (OB_FAIL(ObDDLServerClient::build_ddl_single_replica_response(build_single_replica_response_arg))) {
LOG_WARN("failed to build ddl single replica reponse", KR(ret));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(ctx_->param_.tenant_id_, ddl_task_id_,
*session_info_, common_rpc_proxy))) {
LOG_WARN("failed to wait ddl finish", KR(ret));
} else {
is_finish_or_abort_called_ = true;
LOG_INFO("succeed to finish redef table", KR(ret), K(finish_redef_table_arg));
......@@ -163,17 +158,9 @@ int ObTableLoadRedefTable::abort()
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
abort_redef_table_arg.task_id_ = ddl_task_id_;
abort_redef_table_arg.tenant_id_ = ctx_->param_.tenant_id_;
if (OB_FAIL(ObDDLServerClient::abort_redef_table(abort_redef_table_arg))) {
if (OB_FAIL(ObDDLServerClient::abort_redef_table(abort_redef_table_arg, *session_info_))) {
LOG_WARN("failed to abort redef table", KR(ret), K(abort_redef_table_arg));
} else if (OB_FAIL(ObDDLExecutorUtil::wait_ddl_finish(ctx_->param_.tenant_id_, ddl_task_id_,
*session_info_, common_rpc_proxy))) {
if (OB_CANCELED == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to wait ddl finish", KR(ret));
}
}
if (OB_SUCC(ret)) {
} else {
LOG_INFO("abort_redef_table success", KR(ret));
}
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
......
......@@ -49,7 +49,7 @@ int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg &
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
abort_redef_table_arg.task_id_ = res.task_id_;
abort_redef_table_arg.tenant_id_ = arg.tenant_id_;
if (OB_FAIL(abort_redef_table(abort_redef_table_arg))) {
if (OB_FAIL(abort_redef_table(abort_redef_table_arg, session))) {
LOG_WARN("failed to abort redef table", K(ret), K(abort_redef_table_arg));
}
}
......@@ -81,7 +81,7 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
abort_redef_table_arg.task_id_ = res.task_id_;
abort_redef_table_arg.tenant_id_ = arg.orig_tenant_id_;
if (OB_FAIL(abort_redef_table(abort_redef_table_arg))) {
if (OB_FAIL(abort_redef_table(abort_redef_table_arg, session))) {
LOG_WARN("failed to abort redef table", K(ret), K(abort_redef_table_arg));
}
}
......@@ -106,7 +106,7 @@ int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsA
return ret;
}
int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg)
int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
const int64_t retry_interval = 100 * 1000L;
......@@ -138,6 +138,10 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg)
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
}
int tmp_ret = 0;
if (OB_TMP_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, session, common_rpc_proxy))) {
LOG_WARN("wait ddl finish failed", K(tmp_ret), K(arg.tenant_id_), K(arg.task_id_));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(arg.task_id_))) {
if (OB_HASH_NOT_EXIST == ret) {
......@@ -151,20 +155,30 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg)
return ret;
}
int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &arg)
int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg,
sql::ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
if (OB_UNLIKELY(!arg.is_valid())) {
if (OB_UNLIKELY(!finish_redef_arg.is_valid() || !build_single_arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
LOG_WARN("invalid arg", K(ret), K(finish_redef_arg), K(build_single_arg));
} else if (OB_ISNULL(common_rpc_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("common rpc proxy is null", K(ret));
} else if (OB_FAIL(common_rpc_proxy->finish_redef_table(arg))) {
LOG_WARN("failed to finish redef table", K(ret), K(arg));
} else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(arg.task_id_))) {
LOG_ERROR("failed to remove register task id", K(ret), K(arg));
} else if (OB_FAIL(common_rpc_proxy->finish_redef_table(finish_redef_arg))) {
LOG_WARN("failed to finish redef table", K(ret), K(finish_redef_arg));
} else if (OB_FAIL(build_ddl_single_replica_response(build_single_arg))) {
LOG_WARN("build ddl single replica response", K(ret), K(build_single_arg));
} else if (OB_TMP_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, session, common_rpc_proxy))) {
LOG_WARN("failed to wait ddl finish", K(tmp_ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(finish_redef_arg.task_id_))) {
LOG_ERROR("failed to remove register task id", K(ret), K(finish_redef_arg));
}
}
return ret;
}
......
......@@ -14,6 +14,7 @@
#define OCEANBASE_STORAGE_OB_DDL_SERVER_CLIENT_H
#include "share/ob_rpc_struct.h"
#include "sql/session/ob_sql_session_info.h"
namespace oceanbase
{
......@@ -25,8 +26,10 @@ public:
static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, sql::ObSQLSessionInfo &session);
static int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session);
static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg);
static int finish_redef_table(const obrpc::ObFinishRedefTableArg &arg);
static int abort_redef_table(const obrpc::ObAbortRedefTableArg &arg);
static int finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg,
sql::ObSQLSessionInfo &session);
static int abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session);
static int build_ddl_single_replica_response(const obrpc::ObDDLBuildSingleReplicaResponseArg &arg);
private:
static int wait_table_lock(const uint64_t tenant_id, const int64_t task_id, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册