提交 ccf6ae5c 编写于 作者: C chinaxing 提交者: wangzelin.wzl

[4.0] adjust rollback savepoint msg, remove deps on TxDesc

上级 e1e7a62c
...@@ -1108,7 +1108,6 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, ...@@ -1108,7 +1108,6 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool existed = false; bool existed = false;
int64_t epoch = 0;
ObTxCreateArg arg(tx.can_elr_, /* can_elr */ ObTxCreateArg arg(tx.can_elr_, /* can_elr */
false, /* for_replay */ false, /* for_replay */
tx.tenant_id_, tx.tenant_id_,
...@@ -1379,7 +1378,6 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id, ...@@ -1379,7 +1378,6 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
int64_t &snapshot) int64_t &snapshot)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t epoch = 0;
bool leader = false; bool leader = false;
int64_t snapshot0 = 0; int64_t snapshot0 = 0;
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
...@@ -1485,19 +1483,14 @@ int ObTransService::batch_post_tx_msg_(ObTxRollbackSPMsg &msg, ...@@ -1485,19 +1483,14 @@ int ObTransService::batch_post_tx_msg_(ObTxRollbackSPMsg &msg,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int last_ret = OB_SUCCESS; int last_ret = OB_SUCCESS;
const ObTxDesc *tx_ptr = msg.tx_ptr_;
ARRAY_FOREACH_NORET(list, idx) { ARRAY_FOREACH_NORET(list, idx) {
auto &p = list.at(idx); auto &p = list.at(idx);
msg.receiver_ = p.left_; msg.receiver_ = p.left_;
msg.epoch_ = p.right_; msg.epoch_ = p.right_;
if (msg.epoch_ > 0) {
msg.tx_ptr_ = NULL;
}
if (OB_FAIL(rpc_->post_msg(p.left_, msg))) { if (OB_FAIL(rpc_->post_msg(p.left_, msg))) {
TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p)); TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p));
last_ret = ret; last_ret = ret;
} }
msg.tx_ptr_ = tx_ptr;
} }
return last_ret; return last_ret;
} }
...@@ -1682,22 +1675,45 @@ int ObTransService::handle_trans_abort_request(ObTxAbortMsg &abort_req, ObTransR ...@@ -1682,22 +1675,45 @@ int ObTransService::handle_trans_abort_request(ObTxAbortMsg &abort_req, ObTransR
return ret; return ret;
} }
int ObTransService::create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx)
{
int ret = OB_SUCCESS;
bool existed = false;
ObTxCreateArg arg(msg.can_elr_, /* can_elr */
false, /* for_replay */
msg.tenant_id_,
msg.tx_id_,
msg.receiver_,
msg.cluster_id_,
msg.cluster_version_,
msg.session_id_, /*session_id*/
msg.tx_addr_,
msg.tx_expire_ts_,
this);
ret = tx_ctx_mgr_.create_tx_ctx(arg, existed, ctx);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(msg), K(arg));
ctx = NULL;
}
TRANS_LOG(TRACE, "create tx ctx for savepoint rollback", K(ret), K(msg), K(arg));
return ret;
}
int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg, int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg,
obrpc::ObTxRpcRollbackSPResult &result) obrpc::ObTxRpcRollbackSPResult &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t ctx_born_epoch = -1; int64_t ctx_born_epoch = -1;
ObFunction<int(ObPartTransCtx*&)> create_tx_ctx_func = [this, &msg](ObPartTransCtx *&ctx) -> int {
return this->create_tx_ctx_(msg, ctx);
};
ret = ls_rollback_to_savepoint_(msg.tx_id_, ret = ls_rollback_to_savepoint_(msg.tx_id_,
msg.receiver_, msg.receiver_,
msg.epoch_, msg.epoch_,
msg.op_sn_, msg.op_sn_,
msg.savepoint_, msg.savepoint_,
ctx_born_epoch, ctx_born_epoch,
msg.tx_ptr_); create_tx_ctx_func);
if (OB_NOT_NULL(msg.tx_ptr_)) {
ob_free((void*)msg.tx_ptr_);
msg.tx_ptr_ = NULL;
}
result.status_ = ret; result.status_ = ret;
result.addr_ = self_; result.addr_ = self_;
result.born_epoch_ = ctx_born_epoch; result.born_epoch_ = ctx_born_epoch;
......
...@@ -173,7 +173,7 @@ int rollback_savepoint_slowpath_(ObTxDesc &tx, ...@@ -173,7 +173,7 @@ int rollback_savepoint_slowpath_(ObTxDesc &tx,
int create_tx_ctx_(const share::ObLSID &ls_id, int create_tx_ctx_(const share::ObLSID &ls_id,
const ObTxDesc &tx, const ObTxDesc &tx,
ObPartTransCtx *&ctx); ObPartTransCtx *&ctx);
int create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx);
int create_tx_ctx_(const share::ObLSID &ls_id, int create_tx_ctx_(const share::ObLSID &ls_id,
ObLS *ls, ObLS *ls,
const ObTxDesc &tx, const ObTxDesc &tx,
...@@ -281,7 +281,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id, ...@@ -281,7 +281,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id,
const int64_t op_sn, const int64_t op_sn,
const int64_t savepoint, const int64_t savepoint,
int64_t &ctx_born_epoch, int64_t &ctx_born_epoch,
const ObTxDesc *tx, ObFunction<int(ObPartTransCtx*&)> &func,
int64_t expire_ts = -1); int64_t expire_ts = -1);
int sync_rollback_savepoint__(ObTxDesc &tx, int sync_rollback_savepoint__(ObTxDesc &tx,
ObTxRollbackSPMsg &msg, ObTxRollbackSPMsg &msg,
......
...@@ -1277,13 +1277,16 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, ...@@ -1277,13 +1277,16 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx,
slowpath = false; slowpath = false;
ObTxPart &p = parts[0]; ObTxPart &p = parts[0];
int64_t born_epoch = 0; int64_t born_epoch = 0;
ObFunction<int(ObPartTransCtx*&)> create_tx_ctx_func = [this, &p, &tx](ObPartTransCtx *&ctx) -> int {
return this->create_tx_ctx_(p.id_, tx, ctx);
};
if (OB_FAIL(ls_rollback_to_savepoint_(tx.tx_id_, if (OB_FAIL(ls_rollback_to_savepoint_(tx.tx_id_,
p.id_, p.id_,
p.epoch_, p.epoch_,
tx.op_sn_, tx.op_sn_,
savepoint, savepoint,
born_epoch, born_epoch,
&tx, create_tx_ctx_func,
expire_ts))) { expire_ts))) {
if (OB_NOT_MASTER == ret) { if (OB_NOT_MASTER == ret) {
slowpath = true; slowpath = true;
...@@ -1337,7 +1340,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, ...@@ -1337,7 +1340,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
const int64_t op_sn, const int64_t op_sn,
const int64_t savepoint, const int64_t savepoint,
int64_t &ctx_born_epoch, int64_t &ctx_born_epoch,
const ObTxDesc *tx, ObFunction<int(ObPartTransCtx*&)> &func,
int64_t expire_ts) int64_t expire_ts)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -1346,8 +1349,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, ...@@ -1346,8 +1349,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) { if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) {
if (OB_NOT_MASTER == ret) { if (OB_NOT_MASTER == ret) {
} else if (OB_TRANS_CTX_NOT_EXIST == ret && verify_epoch <= 0) { } else if (OB_TRANS_CTX_NOT_EXIST == ret && verify_epoch <= 0) {
if (OB_FAIL(create_tx_ctx_(ls, *tx, ctx))) { if (OB_FAIL(func(ctx))) {
TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), KPC(tx)); TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), K(tx_id));
} }
} else { } else {
TRANS_LOG(WARN, "get transaction context error", K(ret), K(tx_id), K(ls)); TRANS_LOG(WARN, "get transaction context error", K(ret), K(tx_id), K(ls));
...@@ -1398,32 +1401,12 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, ...@@ -1398,32 +1401,12 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
msg.tx_id_ = tx.tx_id_; msg.tx_id_ = tx.tx_id_;
msg.savepoint_ = savepoint; msg.savepoint_ = savepoint;
msg.op_sn_ = tx.op_sn_; msg.op_sn_ = tx.op_sn_;
msg.can_elr_ = tx.can_elr_;
msg.session_id_ = tx.sess_id_;
msg.tx_addr_ = tx.addr_;
msg.tx_expire_ts_ = tx.get_expire_ts();
msg.epoch_ = -1; msg.epoch_ = -1;
msg.request_id_ = tx.op_sn_; msg.request_id_ = tx.op_sn_;
// prepare msg.tx_ptr_ if required
// TODO(yunxing.cyx) : in 4.1 rework here, won't serialize txDesc
ObTxDesc *tmp_tx_desc = NULL;
ARRAY_FOREACH_NORET(parts, i) {
if (parts[i].epoch_ <= 0) {
int64_t len = tx.get_serialize_size() + sizeof(ObTxDesc);
char *buf = (char*)ob_malloc(len);
int64_t pos = sizeof(ObTxDesc);
if (OB_FAIL(tx.serialize(buf, len, pos))) {
TRANS_LOG(WARN, "serialize tx fail", KR(ret), K(tx));
ob_free(buf);
} else {
tmp_tx_desc = new(buf)ObTxDesc();
pos = sizeof(ObTxDesc);
if (OB_FAIL(tmp_tx_desc->deserialize(buf, len, pos))) {
TRANS_LOG(WARN, "deserialize tx fail", KR(ret));
} else {
tmp_tx_desc->parts_.reset();
msg.tx_ptr_ = tmp_tx_desc;
}
}
break;
}
}
int64_t start_ts = ObTimeUtility::current_time(); int64_t start_ts = ObTimeUtility::current_time();
int retries = 0; int retries = 0;
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
...@@ -1444,11 +1427,6 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, ...@@ -1444,11 +1427,6 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
// clear interrupt flag // clear interrupt flag
tx.flags_.INTERRUPTED_ = false; tx.flags_.INTERRUPTED_ = false;
} }
if (OB_NOT_NULL(tmp_tx_desc)) {
msg.tx_ptr_ = NULL;
tmp_tx_desc->~ObTxDesc();
ob_free(tmp_tx_desc);
}
auto elapsed_us = ObTimeUtility::current_time() - start_ts; auto elapsed_us = ObTimeUtility::current_time() - start_ts;
TRANS_LOG(INFO, "rollback savepoint slowpath", K(ret), TRANS_LOG(INFO, "rollback savepoint slowpath", K(ret),
K_(tx.tx_id), K(start_ts), K(retries), K_(tx.tx_id), K(start_ts), K(retries),
......
...@@ -53,56 +53,7 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, ap ...@@ -53,56 +53,7 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, ap
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionReqMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionReqMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionRespMsg, ObTxMsg, prepare_version_, prepare_info_array_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionRespMsg, ObTxMsg, prepare_version_, prepare_info_array_);
OB_SERIALIZE_MEMBER_INHERIT(ObTxRollbackSPMsg, ObTxMsg, savepoint_, op_sn_, can_elr_, session_id_, tx_addr_, tx_expire_ts_);
OB_DEF_SERIALIZE_SIZE(ObTxRollbackSPMsg)
{
int len = 0;
len += ObTxMsg::get_serialize_size();
LST_DO_CODE(OB_UNIS_ADD_LEN, savepoint_, op_sn_, branch_id_);
if (OB_NOT_NULL(tx_ptr_)) {
OB_UNIS_ADD_LEN(true);
OB_UNIS_ADD_LEN(*tx_ptr_);
} else {
OB_UNIS_ADD_LEN(false);
}
return len;
}
OB_DEF_SERIALIZE(ObTxRollbackSPMsg)
{
int ret = ObTxMsg::serialize(buf, buf_len, pos);
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_ENCODE, savepoint_, op_sn_, branch_id_);
if (OB_NOT_NULL(tx_ptr_)) {
OB_UNIS_ENCODE(true);
OB_UNIS_ENCODE(*tx_ptr_);
} else {
OB_UNIS_ENCODE(false);
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTxRollbackSPMsg)
{
int ret = ObTxMsg::deserialize(buf, data_len, pos);
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_DECODE, savepoint_, op_sn_, branch_id_);
bool has_tx_ptr = false;
OB_UNIS_DECODE(has_tx_ptr);
if (has_tx_ptr) {
void *buffer = ob_malloc(sizeof(ObTxDesc));
if (OB_ISNULL(buffer)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObTxDesc *tmp = new(buffer)ObTxDesc();
OB_UNIS_DECODE(*tmp);
tx_ptr_ = tmp;
}
}
}
return ret;
}
bool ObTxMsg::is_valid() const bool ObTxMsg::is_valid() const
{ {
...@@ -208,7 +159,9 @@ bool ObTxRollbackSPMsg::is_valid() const ...@@ -208,7 +159,9 @@ bool ObTxRollbackSPMsg::is_valid() const
{ {
bool ret = false; bool ret = false;
if (ObTxMsg::is_valid() && type_ == ROLLBACK_SAVEPOINT if (ObTxMsg::is_valid() && type_ == ROLLBACK_SAVEPOINT
&& savepoint_ > -1 && op_sn_ > -1) { && savepoint_ > -1 && op_sn_ > -1
&& session_id_ > 0 && tx_addr_.is_valid()
&& tx_expire_ts_ > 0) {
ret = true; ret = true;
} }
return ret; return ret;
......
...@@ -229,26 +229,21 @@ namespace transaction ...@@ -229,26 +229,21 @@ namespace transaction
ObTxMsg(ROLLBACK_SAVEPOINT), ObTxMsg(ROLLBACK_SAVEPOINT),
savepoint_(-1), savepoint_(-1),
op_sn_(-1), op_sn_(-1),
//todo:后续branch_id使用方式确定后,需要相应修改 can_elr_(false),
branch_id_(-1), session_id_(0),
tx_ptr_(NULL) tx_addr_(),
tx_expire_ts_(-1)
{} {}
~ObTxRollbackSPMsg() { ~ObTxRollbackSPMsg() {}
if (OB_NOT_NULL(tx_ptr_)) {
tx_ptr_->~ObTxDesc();
ob_free((void*)tx_ptr_);
tx_ptr_ = NULL;
}
}
int64_t savepoint_; int64_t savepoint_;
int64_t op_sn_; int64_t op_sn_;
//todo:后期设计中操作编号是否等于branch_id bool can_elr_;
int64_t branch_id_; uint32_t session_id_;
const ObTxDesc *tx_ptr_; ObAddr tx_addr_;
int64_t tx_expire_ts_;
bool is_valid() const; bool is_valid() const;
INHERIT_TO_STRING_KV("txMsg", ObTxMsg, INHERIT_TO_STRING_KV("txMsg", ObTxMsg,
K_(savepoint), K_(op_sn), K_(branch_id), K_(savepoint), K_(op_sn), K_(can_elr), K_(session_id), K_(tx_addr), K_(tx_expire_ts));
KP_(tx_ptr));
OB_UNIS_VERSION(1); OB_UNIS_VERSION(1);
}; };
......
...@@ -121,8 +121,10 @@ public: ...@@ -121,8 +121,10 @@ public:
msg.request_id_ = op_sn_; msg.request_id_ = op_sn_;
msg.savepoint_ = 1; msg.savepoint_ = 1;
msg.op_sn_ = op_sn_; msg.op_sn_ = op_sn_;
msg.branch_id_ = 1; msg.can_elr_ = true;
msg.tx_ptr_ = tx; msg.session_id_ = 202;
msg.tx_addr_ = ObAddr(ObAddr::VER::IPV4, "127.1.1.2", 8919);
msg.tx_expire_ts_ = 120000;
} }
void build_tx_keepalive_msg(ObTxKeepaliveMsg &msg) void build_tx_keepalive_msg(ObTxKeepaliveMsg &msg)
{ {
...@@ -432,11 +434,10 @@ TEST_F(TestObTxMsg, trans_rollback_sp_msg) ...@@ -432,11 +434,10 @@ TEST_F(TestObTxMsg, trans_rollback_sp_msg)
EXPECT_EQ(msg.cluster_id_, msg1.cluster_id_); EXPECT_EQ(msg.cluster_id_, msg1.cluster_id_);
EXPECT_EQ(msg.savepoint_, msg1.savepoint_); EXPECT_EQ(msg.savepoint_, msg1.savepoint_);
EXPECT_EQ(msg.op_sn_, msg1.op_sn_); EXPECT_EQ(msg.op_sn_, msg1.op_sn_);
EXPECT_EQ(msg.branch_id_, msg1.branch_id_); EXPECT_EQ(msg.can_elr_, msg1.can_elr_);
EXPECT_EQ(msg.tx_ptr_->parts_[0].id_, msg1.tx_ptr_->parts_[0].id_); EXPECT_EQ(msg.session_id_, msg1.session_id_);
if (OB_NOT_NULL(msg.tx_ptr_)) { EXPECT_EQ(msg.tx_addr_, msg1.tx_addr_);
msg.tx_ptr_ = NULL; EXPECT_EQ(msg.tx_expire_ts_, msg1.tx_expire_ts_);
}
} }
TEST_F(TestObTxMsg, trans_keepalive_msg) TEST_F(TestObTxMsg, trans_keepalive_msg)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册