Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0218def4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0218def4
编写于
5月 04, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: add action test for transaction
上级
74daf8d1
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
156 addition
and
19 deletion
+156
-19
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+2
-0
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+34
-11
source/dnode/mnode/impl/test/trans/trans2.cpp
source/dnode/mnode/impl/test/trans/trans2.cpp
+120
-8
未找到文件。
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
0218def4
...
...
@@ -44,6 +44,8 @@ typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
int32_t
mndInitTrans
(
SMnode
*
pMnode
);
void
mndCleanupTrans
(
SMnode
*
pMnode
);
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
);
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrnType
type
,
const
SRpcMsg
*
pReq
);
void
mndTransDrop
(
STrans
*
pTrans
);
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
0218def4
...
...
@@ -537,7 +537,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
return
0
;
}
static
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
)
{
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
)
{
STrans
*
pTrans
=
sdbAcquire
(
pMnode
->
pSdb
,
SDB_TRANS
,
&
transId
);
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TRANS_NOT_EXIST
;
...
...
@@ -545,7 +545,7 @@ static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
return
pTrans
;
}
static
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pTrans
);
}
...
...
@@ -919,27 +919,41 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
if
(
arraySize
==
0
)
return
0
;
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
int32_t
code
=
sdbWriteWithoutFree
(
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
return
code
;
if
(
sdbWriteWithoutFree
(
pSdb
,
pRaw
)
!=
0
)
{
code
=
((
terrno
!=
0
)
?
terrno
:
-
1
);
}
}
return
0
;
terrno
=
code
;
return
code
;
}
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
redoLogs
);
int32_t
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
mError
(
"failed to execute redoLogs since %s"
,
terrstr
());
}
return
code
;
}
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
undoLogs
);
int32_t
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
undoLogs
);
if
(
code
!=
0
)
{
mError
(
"failed to execute undoLogs since %s, return success"
,
terrstr
());
}
return
0
;
// return success in any case
}
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransExecuteLogs
(
pMnode
,
pTrans
->
commitLogs
);
int32_t
code
=
mndTransExecuteLogs
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
mError
(
"failed to execute commitLogs since %s"
,
terrstr
());
}
return
code
;
}
static
void
mndTransResetActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SArray
*
pArray
)
{
...
...
@@ -983,6 +997,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
}
else
{
if
(
terrno
==
TSDB_CODE_INVALID_PTR
)
rpcFreeCont
(
rpcMsg
.
pCont
);
mError
(
"trans:%d, action:%d not send since %s"
,
pTrans
->
id
,
action
,
terrstr
());
return
-
1
;
}
...
...
@@ -1029,11 +1044,19 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
if
(
code
!=
0
)
{
mError
(
"failed to execute redoActions since %s"
,
terrstr
());
}
return
code
;
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
return
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
if
(
code
!=
0
)
{
mError
(
"failed to execute undoActions since %s"
,
terrstr
());
}
return
code
;
}
static
bool
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
...
source/dnode/mnode/impl/test/trans/trans2.cpp
浏览文件 @
0218def4
...
...
@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test {
void
SetUp
()
override
{}
void
TearDown
()
override
{}
int32_t
CreateUser
(
const
char
*
acct
,
const
char
*
user
)
{
int32_t
CreateUser
Log
(
const
char
*
acct
,
const
char
*
user
)
{
SUserObj
userObj
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
"taosdata"
,
strlen
(
"taosdata"
),
userObj
.
pass
);
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
...
...
@@ -101,7 +101,11 @@ class MndTestTrans2 : public ::testing::Test {
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
char
*
param
=
strdup
(
"====> test param <====="
);
SSdbRaw
*
pUndoRaw
=
mndUserActionEncode
(
&
userObj
);
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
);
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
char
*
param
=
strdup
(
"====> test log <====="
);
mndTransSetCb
(
pTrans
,
TEST_TRANS_START_FUNC
,
TEST_TRANS_STOP_FUNC
,
param
,
strlen
(
param
)
+
1
);
int32_t
code
=
mndTransPrepare
(
pMnode
,
pTrans
);
...
...
@@ -109,29 +113,137 @@ class MndTestTrans2 : public ::testing::Test {
return
code
;
}
int32_t
CreateUserAction
(
const
char
*
acct
,
const
char
*
user
,
bool
hasUndoAction
)
{
SUserObj
userObj
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
"taosdata"
,
strlen
(
"taosdata"
),
userObj
.
pass
);
tstrncpy
(
userObj
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
userObj
.
acct
,
acct
,
TSDB_USER_LEN
);
userObj
.
createdTime
=
taosGetTimestampMs
();
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
superUser
=
1
;
SRpcMsg
rpcMsg
=
{
0
};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_USER
,
&
rpcMsg
);
SSdbRaw
*
pRedoRaw
=
mndUserActionEncode
(
&
userObj
);
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
);
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_READY
);
SSdbRaw
*
pUndoRaw
=
mndUserActionEncode
(
&
userObj
);
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
);
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
char
*
param
=
strdup
(
"====> test action <====="
);
mndTransSetCb
(
pTrans
,
TEST_TRANS_START_FUNC
,
TEST_TRANS_STOP_FUNC
,
param
,
strlen
(
param
)
+
1
);
{
STransAction
action
=
{
0
};
action
.
epSet
.
inUse
=
0
;
action
.
epSet
.
numOfEps
=
1
;
action
.
epSet
.
eps
[
0
].
port
=
9040
;
strcpy
(
action
.
epSet
.
eps
[
0
].
fqdn
,
"localhost"
);
int32_t
contLen
=
1024
;
void
*
pReq
=
taosMemoryCalloc
(
1
,
contLen
);
strcpy
((
char
*
)
pReq
,
"hello world redo"
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_CREATE_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
mndTransAppendRedoAction
(
pTrans
,
&
action
);
}
if
(
hasUndoAction
)
{
STransAction
action
=
{
0
};
action
.
epSet
.
inUse
=
0
;
action
.
epSet
.
numOfEps
=
1
;
action
.
epSet
.
eps
[
0
].
port
=
9040
;
strcpy
(
action
.
epSet
.
eps
[
0
].
fqdn
,
"localhost"
);
int32_t
contLen
=
1024
;
void
*
pReq
=
taosMemoryCalloc
(
1
,
contLen
);
strcpy
((
char
*
)
pReq
,
"hello world undo"
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_CREATE_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
mndTransAppendUndoAction
(
pTrans
,
&
action
);
}
int32_t
code
=
mndTransPrepare
(
pMnode
,
pTrans
);
mndTransDrop
(
pTrans
);
return
code
;
}
};
SMnode
*
MndTestTrans2
::
pMnode
;
TEST_F
(
MndTestTrans2
,
01
_
CbFunc
)
{
TEST_F
(
MndTestTrans2
,
01
_
Log
)
{
const
char
*
acct
=
"root"
;
const
char
*
acct_invalid
=
"root1"
;
const
char
*
user1
=
"
test
1"
;
const
char
*
user2
=
"
test
2"
;
const
char
*
user1
=
"
log
1"
;
const
char
*
user2
=
"
log
2"
;
SUserObj
*
pUser1
=
NULL
;
SUserObj
*
pUser2
=
NULL
;
ASSERT_NE
(
pMnode
,
nullptr
);
// create user success
EXPECT_EQ
(
CreateUser
(
acct
,
user1
),
0
);
EXPECT_EQ
(
CreateUserLog
(
acct
,
user1
),
0
);
pUser1
=
mndAcquireUser
(
pMnode
,
user1
);
ASSERT_NE
(
pUser1
,
nullptr
);
// failed to create user and rollback
EXPECT_EQ
(
CreateUser
(
acct_invalid
,
user2
),
0
);
EXPECT_EQ
(
CreateUser
Log
(
acct_invalid
,
user2
),
0
);
pUser2
=
mndAcquireUser
(
pMnode
,
user2
);
ASSERT_EQ
(
pUser2
,
nullptr
);
mndTransPullup
(
pMnode
);
}
TEST_F
(
MndTestTrans2
,
02
_Action
)
{
const
char
*
acct
=
"root"
;
const
char
*
acct_invalid
=
"root1"
;
const
char
*
user1
=
"action1"
;
const
char
*
user2
=
"action2"
;
SUserObj
*
pUser1
=
NULL
;
SUserObj
*
pUser2
=
NULL
;
STrans
*
pTrans
=
NULL
;
int32_t
transId
=
4
;
int32_t
action
=
0
;
ASSERT_NE
(
pMnode
,
nullptr
);
// failed to create user and rollback
EXPECT_EQ
(
CreateUserAction
(
acct
,
user1
,
false
),
0
);
pUser1
=
mndAcquireUser
(
pMnode
,
user1
);
ASSERT_EQ
(
pUser1
,
nullptr
);
mndReleaseUser
(
pMnode
,
pUser1
);
// create user, and fake a response
EXPECT_EQ
(
CreateUserAction
(
acct
,
user1
,
true
),
0
);
pUser1
=
mndAcquireUser
(
pMnode
,
user1
);
ASSERT_NE
(
pUser1
,
nullptr
);
mndReleaseUser
(
pMnode
,
pUser1
);
pTrans
=
mndAcquireTrans
(
pMnode
,
transId
);
EXPECT_EQ
(
pTrans
->
code
,
TSDB_CODE_INVALID_PTR
);
EXPECT_EQ
(
pTrans
->
stage
,
TRN_STAGE_UNDO_ACTION
);
EXPECT_EQ
(
pTrans
->
failedTimes
,
1
);
STransAction
*
pAction
=
(
STransAction
*
)
taosArrayGet
(
pTrans
->
undoActions
,
action
);
pAction
->
msgSent
=
1
;
SNodeMsg
rspMsg
=
{
0
};
rspMsg
.
pNode
=
pMnode
;
int64_t
signature
=
transId
;
signature
=
(
signature
<<
32
);
signature
+=
action
;
rspMsg
.
rpcMsg
.
ahandle
=
(
void
*
)
signature
;
mndTransProcessRsp
(
&
rspMsg
);
mndReleaseTrans
(
pMnode
,
pTrans
);
pUser1
=
mndAcquireUser
(
pMnode
,
user1
);
ASSERT_EQ
(
pUser1
,
nullptr
);
mndReleaseUser
(
pMnode
,
pUser1
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录