提交 26529621 编写于 作者: C chinaxing 提交者: wangzelin.wzl

[4.1] revert [adjust savepoint rollback msg]

上级 f71e5149
......@@ -1108,6 +1108,7 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id,
{
int ret = OB_SUCCESS;
bool existed = false;
int64_t epoch = 0;
ObTxCreateArg arg(tx.can_elr_, /* can_elr */
false, /* for_replay */
tx.tenant_id_,
......@@ -1378,6 +1379,7 @@ int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_id,
int64_t &snapshot)
{
int ret = OB_SUCCESS;
int64_t epoch = 0;
bool leader = false;
int64_t snapshot0 = 0;
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL;
......@@ -1483,14 +1485,19 @@ int ObTransService::batch_post_tx_msg_(ObTxRollbackSPMsg &msg,
{
int ret = OB_SUCCESS;
int last_ret = OB_SUCCESS;
const ObTxDesc *tx_ptr = msg.tx_ptr_;
ARRAY_FOREACH_NORET(list, idx) {
auto &p = list.at(idx);
msg.receiver_ = p.left_;
msg.epoch_ = p.right_;
if (msg.epoch_ > 0) {
msg.tx_ptr_ = NULL;
}
if (OB_FAIL(rpc_->post_msg(p.left_, msg))) {
TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p));
last_ret = ret;
}
msg.tx_ptr_ = tx_ptr;
}
return last_ret;
}
......@@ -1675,45 +1682,22 @@ int ObTransService::handle_trans_abort_request(ObTxAbortMsg &abort_req, ObTransR
return ret;
}
int ObTransService::create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx)
{
int ret = OB_SUCCESS;
bool existed = false;
ObTxCreateArg arg(msg.can_elr_, /* can_elr */
false, /* for_replay */
msg.tenant_id_,
msg.tx_id_,
msg.receiver_,
msg.cluster_id_,
msg.cluster_version_,
msg.session_id_, /*session_id*/
msg.tx_addr_,
msg.tx_expire_ts_,
this);
ret = tx_ctx_mgr_.create_tx_ctx(arg, existed, ctx);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(msg), K(arg));
ctx = NULL;
}
TRANS_LOG(TRACE, "create tx ctx for savepoint rollback", K(ret), K(msg), K(arg));
return ret;
}
int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg,
obrpc::ObTxRpcRollbackSPResult &result)
{
int ret = OB_SUCCESS;
int64_t ctx_born_epoch = -1;
ObFunction<int(ObPartTransCtx*&)> create_tx_ctx_func = [this, &msg](ObPartTransCtx *&ctx) -> int {
return this->create_tx_ctx_(msg, ctx);
};
ret = ls_rollback_to_savepoint_(msg.tx_id_,
msg.receiver_,
msg.epoch_,
msg.op_sn_,
msg.savepoint_,
ctx_born_epoch,
create_tx_ctx_func);
msg.tx_ptr_);
if (OB_NOT_NULL(msg.tx_ptr_)) {
ob_free((void*)msg.tx_ptr_);
msg.tx_ptr_ = NULL;
}
result.status_ = ret;
result.addr_ = self_;
result.born_epoch_ = ctx_born_epoch;
......
......@@ -173,7 +173,7 @@ int rollback_savepoint_slowpath_(ObTxDesc &tx,
int create_tx_ctx_(const share::ObLSID &ls_id,
const ObTxDesc &tx,
ObPartTransCtx *&ctx);
int create_tx_ctx_(ObTxRollbackSPMsg &msg, ObPartTransCtx *&ctx);
int create_tx_ctx_(const share::ObLSID &ls_id,
ObLS *ls,
const ObTxDesc &tx,
......@@ -281,7 +281,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id,
const int64_t op_sn,
const int64_t savepoint,
int64_t &ctx_born_epoch,
ObFunction<int(ObPartTransCtx*&)> &func,
const ObTxDesc *tx,
int64_t expire_ts = -1);
int sync_rollback_savepoint__(ObTxDesc &tx,
ObTxRollbackSPMsg &msg,
......
......@@ -1277,16 +1277,13 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx,
slowpath = false;
ObTxPart &p = parts[0];
int64_t born_epoch = 0;
ObFunction<int(ObPartTransCtx*&)> create_tx_ctx_func = [this, &p, &tx](ObPartTransCtx *&ctx) -> int {
return this->create_tx_ctx_(p.id_, tx, ctx);
};
if (OB_FAIL(ls_rollback_to_savepoint_(tx.tx_id_,
p.id_,
p.epoch_,
tx.op_sn_,
savepoint,
born_epoch,
create_tx_ctx_func,
&tx,
expire_ts))) {
if (OB_NOT_MASTER == ret) {
slowpath = true;
......@@ -1340,7 +1337,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
const int64_t op_sn,
const int64_t savepoint,
int64_t &ctx_born_epoch,
ObFunction<int(ObPartTransCtx*&)> &func,
const ObTxDesc *tx,
int64_t expire_ts)
{
int ret = OB_SUCCESS;
......@@ -1349,8 +1346,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
if (OB_FAIL(get_tx_ctx_(ls, tx_id, ctx))) {
if (OB_NOT_MASTER == ret) {
} else if (OB_TRANS_CTX_NOT_EXIST == ret && verify_epoch <= 0) {
if (OB_FAIL(func(ctx))) {
TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), K(tx_id));
if (OB_FAIL(create_tx_ctx_(ls, *tx, ctx))) {
TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls), KPC(tx));
}
} else {
TRANS_LOG(WARN, "get transaction context error", K(ret), K(tx_id), K(ls));
......@@ -1401,12 +1398,32 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
msg.tx_id_ = tx.tx_id_;
msg.savepoint_ = savepoint;
msg.op_sn_ = tx.op_sn_;
msg.can_elr_ = tx.can_elr_;
msg.session_id_ = tx.sess_id_;
msg.tx_addr_ = tx.addr_;
msg.tx_expire_ts_ = tx.get_expire_ts();
msg.epoch_ = -1;
msg.request_id_ = tx.op_sn_;
// prepare msg.tx_ptr_ if required
// TODO(yunxing.cyx) : in 4.1 rework here, won't serialize txDesc
ObTxDesc *tmp_tx_desc = NULL;
ARRAY_FOREACH_NORET(parts, i) {
if (parts[i].epoch_ <= 0) {
int64_t len = tx.get_serialize_size() + sizeof(ObTxDesc);
char *buf = (char*)ob_malloc(len);
int64_t pos = sizeof(ObTxDesc);
if (OB_FAIL(tx.serialize(buf, len, pos))) {
TRANS_LOG(WARN, "serialize tx fail", KR(ret), K(tx));
ob_free(buf);
} else {
tmp_tx_desc = new(buf)ObTxDesc();
pos = sizeof(ObTxDesc);
if (OB_FAIL(tmp_tx_desc->deserialize(buf, len, pos))) {
TRANS_LOG(WARN, "deserialize tx fail", KR(ret));
} else {
tmp_tx_desc->parts_.reset();
msg.tx_ptr_ = tmp_tx_desc;
}
}
break;
}
}
int64_t start_ts = ObTimeUtility::current_time();
int retries = 0;
if (OB_SUCC(ret)) {
......@@ -1427,6 +1444,11 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
// clear interrupt flag
tx.flags_.INTERRUPTED_ = false;
}
if (OB_NOT_NULL(tmp_tx_desc)) {
msg.tx_ptr_ = NULL;
tmp_tx_desc->~ObTxDesc();
ob_free(tmp_tx_desc);
}
auto elapsed_us = ObTimeUtility::current_time() - start_ts;
TRANS_LOG(INFO, "rollback savepoint slowpath", K(ret),
K_(tx.tx_id), K(start_ts), K(retries),
......
......@@ -53,7 +53,56 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, ap
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionReqMsg, ObTxMsg);
OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareVersionRespMsg, ObTxMsg, prepare_version_, prepare_info_array_);
OB_SERIALIZE_MEMBER_INHERIT(ObTxRollbackSPMsg, ObTxMsg, savepoint_, op_sn_, can_elr_, session_id_, tx_addr_, tx_expire_ts_);
OB_DEF_SERIALIZE_SIZE(ObTxRollbackSPMsg)
{
int len = 0;
len += ObTxMsg::get_serialize_size();
LST_DO_CODE(OB_UNIS_ADD_LEN, savepoint_, op_sn_, branch_id_);
if (OB_NOT_NULL(tx_ptr_)) {
OB_UNIS_ADD_LEN(true);
OB_UNIS_ADD_LEN(*tx_ptr_);
} else {
OB_UNIS_ADD_LEN(false);
}
return len;
}
OB_DEF_SERIALIZE(ObTxRollbackSPMsg)
{
int ret = ObTxMsg::serialize(buf, buf_len, pos);
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_ENCODE, savepoint_, op_sn_, branch_id_);
if (OB_NOT_NULL(tx_ptr_)) {
OB_UNIS_ENCODE(true);
OB_UNIS_ENCODE(*tx_ptr_);
} else {
OB_UNIS_ENCODE(false);
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTxRollbackSPMsg)
{
int ret = ObTxMsg::deserialize(buf, data_len, pos);
if (OB_SUCC(ret)) {
LST_DO_CODE(OB_UNIS_DECODE, savepoint_, op_sn_, branch_id_);
bool has_tx_ptr = false;
OB_UNIS_DECODE(has_tx_ptr);
if (has_tx_ptr) {
void *buffer = ob_malloc(sizeof(ObTxDesc));
if (OB_ISNULL(buffer)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
ObTxDesc *tmp = new(buffer)ObTxDesc();
OB_UNIS_DECODE(*tmp);
tx_ptr_ = tmp;
}
}
}
return ret;
}
bool ObTxMsg::is_valid() const
{
......@@ -159,9 +208,7 @@ bool ObTxRollbackSPMsg::is_valid() const
{
bool ret = false;
if (ObTxMsg::is_valid() && type_ == ROLLBACK_SAVEPOINT
&& savepoint_ > -1 && op_sn_ > -1
&& session_id_ > 0 && tx_addr_.is_valid()
&& tx_expire_ts_ > 0) {
&& savepoint_ > -1 && op_sn_ > -1) {
ret = true;
}
return ret;
......
......@@ -229,21 +229,26 @@ namespace transaction
ObTxMsg(ROLLBACK_SAVEPOINT),
savepoint_(-1),
op_sn_(-1),
can_elr_(false),
session_id_(0),
tx_addr_(),
tx_expire_ts_(-1)
//todo:后续branch_id使用方式确定后,需要相应修改
branch_id_(-1),
tx_ptr_(NULL)
{}
~ObTxRollbackSPMsg() {}
~ObTxRollbackSPMsg() {
if (OB_NOT_NULL(tx_ptr_)) {
tx_ptr_->~ObTxDesc();
ob_free((void*)tx_ptr_);
tx_ptr_ = NULL;
}
}
int64_t savepoint_;
int64_t op_sn_;
bool can_elr_;
uint32_t session_id_;
ObAddr tx_addr_;
int64_t tx_expire_ts_;
//todo:后期设计中操作编号是否等于branch_id
int64_t branch_id_;
const ObTxDesc *tx_ptr_;
bool is_valid() const;
INHERIT_TO_STRING_KV("txMsg", ObTxMsg,
K_(savepoint), K_(op_sn), K_(can_elr), K_(session_id), K_(tx_addr), K_(tx_expire_ts));
K_(savepoint), K_(op_sn), K_(branch_id),
KP_(tx_ptr));
OB_UNIS_VERSION(1);
};
......
......@@ -121,10 +121,8 @@ public:
msg.request_id_ = op_sn_;
msg.savepoint_ = 1;
msg.op_sn_ = op_sn_;
msg.can_elr_ = true;
msg.session_id_ = 202;
msg.tx_addr_ = ObAddr(ObAddr::VER::IPV4, "127.1.1.2", 8919);
msg.tx_expire_ts_ = 120000;
msg.branch_id_ = 1;
msg.tx_ptr_ = tx;
}
void build_tx_keepalive_msg(ObTxKeepaliveMsg &msg)
{
......@@ -434,10 +432,11 @@ TEST_F(TestObTxMsg, trans_rollback_sp_msg)
EXPECT_EQ(msg.cluster_id_, msg1.cluster_id_);
EXPECT_EQ(msg.savepoint_, msg1.savepoint_);
EXPECT_EQ(msg.op_sn_, msg1.op_sn_);
EXPECT_EQ(msg.can_elr_, msg1.can_elr_);
EXPECT_EQ(msg.session_id_, msg1.session_id_);
EXPECT_EQ(msg.tx_addr_, msg1.tx_addr_);
EXPECT_EQ(msg.tx_expire_ts_, msg1.tx_expire_ts_);
EXPECT_EQ(msg.branch_id_, msg1.branch_id_);
EXPECT_EQ(msg.tx_ptr_->parts_[0].id_, msg1.tx_ptr_->parts_[0].id_);
if (OB_NOT_NULL(msg.tx_ptr_)) {
msg.tx_ptr_ = NULL;
}
}
TEST_F(TestObTxMsg, trans_keepalive_msg)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册