提交 42b0938e 编写于 作者: H Hongqin-Li 提交者: ob-robot

Fix illegal snapshot version used by direct load to create ddl major

上级 ef1c90b7
......@@ -113,7 +113,6 @@ int ObTableLoadBeginP::process()
param.online_opt_stat_gather_ = false;
param.data_type_ = static_cast<ObTableLoadDataType>(arg_.config_.flag_.data_type_);
param.dup_action_ = ObLoadDupActionType::LOAD_STOP_ON_DUP;
param.snapshot_version_ = ObTimeUtil::current_time_ns();
if (OB_FAIL(param.normalize())) {
LOG_WARN("fail to normalize param", KR(ret));
}
......@@ -207,6 +206,7 @@ int ObTableLoadBeginP::create_table_ctx(const ObTableLoadParam &param,
ddl_param.dest_table_id_ = start_res.dest_table_id_;
ddl_param.task_id_ = start_res.task_id_;
ddl_param.schema_version_ = start_res.schema_version_;
ddl_param.snapshot_version_ = start_res.snapshot_version_;
ddl_param.data_version_ = data_version;
}
if (OB_SUCC(ret)) {
......@@ -275,12 +275,12 @@ int ObTableLoadPreBeginPeerP::process()
param.px_mode_ = arg_.px_mode_;
param.online_opt_stat_gather_ = arg_.online_opt_stat_gather_;
param.dup_action_ = arg_.dup_action_;
param.snapshot_version_ = arg_.snapshot_version_;
ObTableLoadDDLParam ddl_param;
uint64_t data_version = 0;
ddl_param.dest_table_id_ = arg_.dest_table_id_;
ddl_param.task_id_ = arg_.task_id_;
ddl_param.schema_version_ = arg_.schema_version_;
ddl_param.snapshot_version_ = arg_.snapshot_version_;
ddl_param.data_version_ = arg_.data_version_;
if (OB_FAIL(create_table_ctx(param, ddl_param, table_ctx))) {
LOG_WARN("fail to create table ctx", KR(ret));
......
......@@ -225,10 +225,10 @@ int ObTableLoadCoordinator::pre_begin_peers()
request.dup_action_ = param_.dup_action_;
request.px_mode_ = param_.px_mode_;
request.online_opt_stat_gather_ = param_.online_opt_stat_gather_;
request.snapshot_version_ = param_.snapshot_version_;
request.dest_table_id_ = ctx_->ddl_param_.dest_table_id_;
request.task_id_ = ctx_->ddl_param_.task_id_;
request.schema_version_ = ctx_->ddl_param_.schema_version_;
request.snapshot_version_ = ctx_->ddl_param_.snapshot_version_;
request.data_version_ = ctx_->ddl_param_.data_version_;
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
......
......@@ -114,6 +114,7 @@ int ObTableLoadInstance::create_table_ctx(ObTableLoadParam &param,
ddl_param.dest_table_id_ = start_res.dest_table_id_;
ddl_param.task_id_ = start_res.task_id_;
ddl_param.schema_version_ = start_res.schema_version_;
ddl_param.snapshot_version_ = start_res.snapshot_version_;
ddl_param.data_version_ = data_version;
}
if (OB_SUCC(ret)) {
......
......@@ -190,7 +190,7 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.target_table_id_ = store_ctx_->ctx_->ddl_param_.dest_table_id_;
merge_param.rowkey_column_num_ = store_ctx_->ctx_->schema_.rowkey_column_count_;
merge_param.store_column_count_ = store_ctx_->ctx_->schema_.store_column_count_;
merge_param.snapshot_version_ = param_.snapshot_version_;
merge_param.snapshot_version_ = store_ctx_->ctx_->ddl_param_.snapshot_version_;
merge_param.table_data_desc_ = store_ctx_->table_data_desc_;
merge_param.datum_utils_ = &(store_ctx_->ctx_->schema_.datum_utils_);
merge_param.col_descs_ = &(store_ctx_->ctx_->schema_.column_descs_);
......
......@@ -30,6 +30,7 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
const int64_t origin_timeout_ts = THIS_WORKER.get_timeout_ts();
ObCreateHiddenTableArg create_table_arg;
ObCreateHiddenTableRes create_table_res;
int64_t snapshot_version = OB_INVALID_VERSION;
create_table_arg.reset();
create_table_arg.exec_tenant_id_ = arg.tenant_id_;
create_table_arg.tenant_id_ = arg.tenant_id_;
......@@ -45,12 +46,16 @@ int ObTableLoadRedefTable::start(const ObTableLoadRedefTableStartArg &arg,
create_table_arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = session_info.get_local_nls_timestamp_tz_format();
if (OB_FAIL(create_table_arg.tz_info_wrap_.deep_copy(session_info.get_tz_info_wrap()))) {
LOG_WARN("failed to deep copy tz_info_wrap", KR(ret));
} else if (OB_FAIL(ObDDLServerClient::create_hidden_table(create_table_arg, create_table_res, session_info))) {
} else if (OB_FAIL(ObDDLServerClient::create_hidden_table(create_table_arg, create_table_res, snapshot_version, session_info))) {
LOG_WARN("failed to create hidden table", KR(ret), K(create_table_arg));
} else if (OB_UNLIKELY(snapshot_version <= 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid snapshot version", K(ret));
} else {
res.dest_table_id_ = create_table_res.dest_table_id_;
res.task_id_ = create_table_res.task_id_;
res.schema_version_ = create_table_res.schema_version_;
res.snapshot_version_ = snapshot_version;
LOG_INFO("succeed to create hidden table", K(arg), K(res));
}
THIS_WORKER.set_timeout_ts(origin_timeout_ts);
......
......@@ -49,7 +49,7 @@ struct ObTableLoadRedefTableStartRes
{
public:
ObTableLoadRedefTableStartRes()
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0)
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0), snapshot_version_(0)
{
}
~ObTableLoadRedefTableStartRes() = default;
......@@ -58,12 +58,14 @@ public:
dest_table_id_ = common::OB_INVALID_ID;
task_id_ = 0;
schema_version_ = 0;
snapshot_version_ = 0;
}
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version));
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(snapshot_version));
public:
uint64_t dest_table_id_;
int64_t task_id_;
int64_t schema_version_;
int64_t snapshot_version_;
};
struct ObTableLoadRedefTableFinishArg
......
......@@ -72,7 +72,7 @@ int ObTableLoadStoreCtx::init(
ObDirectLoadInsertTableParam insert_table_param;
insert_table_param.table_id_ = ctx_->param_.table_id_;
insert_table_param.schema_version_ = ctx_->ddl_param_.schema_version_;
insert_table_param.snapshot_version_ = ctx_->param_.snapshot_version_;
insert_table_param.snapshot_version_ = ctx_->ddl_param_.snapshot_version_;
insert_table_param.ddl_task_id_ = ctx_->ddl_param_.task_id_;
insert_table_param.execution_id_ = 1; //仓氐说暂时设置为1,不然后面检测过不了
insert_table_param.data_version_ = ctx_->ddl_param_.data_version_;
......
......@@ -107,8 +107,7 @@ struct ObTableLoadParam
px_mode_(false),
online_opt_stat_gather_(false),
data_type_(ObTableLoadDataType::RAW_STRING),
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
snapshot_version_(0)
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE)
{
}
......@@ -129,13 +128,12 @@ struct ObTableLoadParam
common::OB_INVALID_ID != table_id_ &&
session_count_ > 0 && session_count_ <= MAX_TABLE_LOAD_SESSION_COUNT &&
batch_size_ > 0 &&
column_count_ > 0 &&
snapshot_version_ > 0;
column_count_ > 0;
}
TO_STRING_KV(K_(tenant_id), K_(table_id), K_(session_count), K_(batch_size),
K_(max_error_row_count), K_(sql_mode), K_(column_count), K_(need_sort), K_(px_mode),
K_(online_opt_stat_gather), K_(data_type), K_(dup_action), K_(snapshot_version));
K_(online_opt_stat_gather), K_(data_type), K_(dup_action));
public:
uint64_t tenant_id_;
uint64_t table_id_;
......@@ -149,32 +147,39 @@ public:
bool online_opt_stat_gather_;
ObTableLoadDataType data_type_;
sql::ObLoadDupActionType dup_action_;
int64_t snapshot_version_;
};
struct ObTableLoadDDLParam
{
public:
ObTableLoadDDLParam()
: dest_table_id_(common::OB_INVALID_ID), task_id_(0), schema_version_(0), data_version_(0) {}
: dest_table_id_(common::OB_INVALID_ID),
task_id_(0),
schema_version_(0),
snapshot_version_(0),
data_version_(0)
{
}
void reset()
{
dest_table_id_ = common::OB_INVALID_ID;
task_id_ = 0;
schema_version_ = 0;
snapshot_version_ = 0;
data_version_ = 0;
}
bool is_valid() const
{
return common::OB_INVALID_ID != dest_table_id_ && 0 != task_id_ && 0 != schema_version_ &&
0 != data_version_;
0 != snapshot_version_ && 0 != data_version_;
}
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(data_version));
TO_STRING_KV(K_(dest_table_id), K_(task_id), K_(schema_version), K_(snapshot_version),
K_(data_version));
public:
uint64_t dest_table_id_;
int64_t task_id_;
int64_t schema_version_;
int64_t snapshot_version_;
int64_t data_version_;
};
......
......@@ -187,7 +187,7 @@ int ObTableLoadTransStoreWriter::init_session_ctx_array()
}
}
ObDirectLoadTableStoreParam param;
param.snapshot_version_ = param_.snapshot_version_;
param.snapshot_version_ = trans_ctx_->ctx_->ddl_param_.snapshot_version_;
param.table_data_desc_ = *table_data_desc_;
param.datum_utils_ = &(trans_ctx_->ctx_->schema_.datum_utils_);
param.col_descs_ = &(trans_ctx_->ctx_->schema_.column_descs_);
......
......@@ -38,10 +38,10 @@ OB_SERIALIZE_MEMBER(ObTableLoadPreBeginPeerRequest,
dup_action_,
px_mode_,
online_opt_stat_gather_,
snapshot_version_,
dest_table_id_,
task_id_,
schema_version_,
snapshot_version_,
data_version_,
partition_id_array_,
target_partition_id_array_);
......
......@@ -64,10 +64,10 @@ public:
dup_action_(sql::ObLoadDupActionType::LOAD_INVALID_MODE),
px_mode_(false),
online_opt_stat_gather_(false),
snapshot_version_(0),
dest_table_id_(common::OB_INVALID_ID),
task_id_(0),
schema_version_(0),
snapshot_version_(0),
data_version_(0)
{
}
......@@ -77,10 +77,10 @@ public:
K_(dup_action),
K_(px_mode),
K_(online_opt_stat_gather),
K_(snapshot_version),
K_(dest_table_id),
K_(task_id),
K_(schema_version),
K_(snapshot_version),
K_(data_version),
K_(partition_id_array),
K_(target_partition_id_array));
......@@ -92,11 +92,11 @@ public:
sql::ObLoadDupActionType dup_action_;
bool px_mode_;
bool online_opt_stat_gather_;
int64_t snapshot_version_;
// ddl param
uint64_t dest_table_id_;
int64_t task_id_;
int64_t schema_version_;
int64_t snapshot_version_;
int64_t data_version_;
// partition info
ObTableLoadArray<ObTableLoadLSIdAndPartitionId> partition_id_array_;//orig table
......
......@@ -2048,7 +2048,6 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.sql_mode_ = execute_param_.sql_mode_;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
load_param.snapshot_version_ = ObTimeUtil::current_time_ns();
if (OB_FAIL(direct_loader_.init(load_param,
execute_param_.store_column_idxs_, &execute_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));
......
......@@ -65,7 +65,6 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
param.max_error_row_count_ = 0;
param.dup_action_ = sql::ObLoadDupActionType::LOAD_STOP_ON_DUP;
param.sql_mode_ = sql_mode;
param.snapshot_version_ = ObTimeUtil::current_time_ns();
if (OB_FAIL(table_load_instance_->init(param, store_column_idxs, load_exec_ctx_))) {
LOG_WARN("failed to init direct loader", KR(ret));
} else {
......
......@@ -28,7 +28,7 @@ namespace oceanbase
namespace storage
{
int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, sql::ObSQLSessionInfo &session)
int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
......@@ -44,7 +44,7 @@ int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg &
LOG_WARN("failed to set register task id", K(ret), K(res));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(wait_task_reach_pending(arg.tenant_id_, res.task_id_, *GCTX.sql_proxy_, session))) {
if (OB_FAIL(wait_task_reach_pending(arg.tenant_id_, res.task_id_, snapshot_version, *GCTX.sql_proxy_, session))) {
LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
}
#ifdef ERRSIM
......@@ -71,6 +71,7 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
{
int ret = OB_SUCCESS;
obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_;
int64_t unused_snapshot_version = OB_INVALID_VERSION;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
......@@ -81,7 +82,7 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg,
LOG_WARN("failed to start redef table", KR(ret), K(arg));
} else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.set_register_task_id(res.task_id_, res.tenant_id_))) {
LOG_WARN("failed to set register task id", K(ret), K(res));
} else if (OB_FAIL(wait_task_reach_pending(arg.orig_tenant_id_, res.task_id_, *GCTX.sql_proxy_, session))) {
} else if (OB_FAIL(wait_task_reach_pending(arg.orig_tenant_id_, res.task_id_, unused_snapshot_version, *GCTX.sql_proxy_, session))) {
LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res));
int tmp_ret = OB_SUCCESS;
obrpc::ObAbortRedefTableArg abort_redef_table_arg;
......@@ -254,7 +255,7 @@ int ObDDLServerClient::build_ddl_single_replica_response(const obrpc::ObDDLBuild
return ret;
}
int ObDDLServerClient::wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session)
int ObDDLServerClient::wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
const int64_t retry_interval = 100 * 1000;
......@@ -267,7 +268,7 @@ int ObDDLServerClient::wait_task_reach_pending(const uint64_t tenant_id, const i
LOG_WARN("invalid argument", K(ret), K(task_id), K(tenant_id));
} else {
while (OB_SUCC(ret)) {
if (OB_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
if (OB_FAIL(sql_string.assign_fmt("SELECT status, snapshot_version FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
LOG_WARN("assign sql string failed", K(ret), K(task_id));
} else if (OB_FAIL(sql_proxy.read(res, tenant_id, sql_string.ptr()))) {
LOG_WARN("fail to execute sql", K(ret), K(sql_string));
......@@ -283,6 +284,7 @@ int ObDDLServerClient::wait_task_reach_pending(const uint64_t tenant_id, const i
} else {
int task_status = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int);
EXTRACT_UINT_FIELD_MYSQL(*result, "snapshot_version", snapshot_version, uint64_t);
share::ObDDLTaskStatus task_cur_status = static_cast<share::ObDDLTaskStatus>(task_status);
if (rootserver::ObTableRedefinitionTask::check_task_status_before_pending(task_cur_status)) {
LOG_INFO("task status not equal REPENDING, Please Keep Waiting", K(task_status));
......
......@@ -23,7 +23,7 @@ namespace storage
class ObDDLServerClient final
{
public:
static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, sql::ObSQLSessionInfo &session);
static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session);
static int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session);
static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg);
static int finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg,
......@@ -32,7 +32,7 @@ public:
static int abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session);
static int build_ddl_single_replica_response(const obrpc::ObDDLBuildSingleReplicaResponseArg &arg);
private:
static int wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session);
static int wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session);
static int heart_beat_clear(const int64_t task_id);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册