未验证 提交 2198001b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12122 from taosdata/feature/dnode

test: add unitest for transaction
...@@ -61,6 +61,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); ...@@ -61,6 +61,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SNodeMsg *pRsp); void mndTransProcessRsp(SNodeMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -997,7 +997,12 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -997,7 +997,12 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
} else { } else {
if (terrno == TSDB_CODE_INVALID_PTR) rpcFreeCont(rpcMsg.pCont); pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = terrno;
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
rpcFreeCont(rpcMsg.pCont);
}
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1; return -1;
} }
...@@ -1275,7 +1280,7 @@ static int32_t mndProcessTransReq(SNodeMsg *pReq) { ...@@ -1275,7 +1280,7 @@ static int32_t mndProcessTransReq(SNodeMsg *pReq) {
return 0; return 0;
} }
static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
SArray *pArray = NULL; SArray *pArray = NULL;
if (pTrans->stage == TRN_STAGE_REDO_ACTION) { if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
pArray = pTrans->redoActions; pArray = pTrans->redoActions;
...@@ -1293,14 +1298,14 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -1293,14 +1298,14 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgReceived == 0) { if (pAction->msgReceived == 0) {
mInfo("trans:%d, action:%d set processed", pTrans->id, i); mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = 0; pAction->errCode = 0;
} }
if (pAction->errCode != 0) { if (pAction->errCode != 0) {
mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i, mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i,
tstrerror(pAction->errCode)); tstrerror(pAction->errCode));
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
......
...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
int32_t CreateUserLog(const char *acct, const char *user) { int32_t CreateUserLog(const char *acct, const char *user, ETrnType type, SDbObj *pDb) {
SUserObj userObj = {0}; SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.user, user, TSDB_USER_LEN);
...@@ -96,7 +96,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -96,7 +96,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1; userObj.superUser = 1;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, type, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw); mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...@@ -108,13 +108,18 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -108,13 +108,18 @@ class MndTestTrans2 : public ::testing::Test {
char *param = strdup("====> test log <====="); char *param = strdup("====> test log <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
}
int32_t code = mndTransPrepare(pMnode, pTrans); int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy) { int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnType type,
SDbObj *pDb) {
SUserObj userObj = {0}; SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.user, user, TSDB_USER_LEN);
...@@ -124,7 +129,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -124,7 +129,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1; userObj.superUser = 1;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, policy, TRN_TYPE_CREATE_USER, &rpcMsg); STrans *pTrans = mndTransCreate(pMnode, policy, type, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw); mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...@@ -170,6 +175,44 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -170,6 +175,44 @@ class MndTestTrans2 : public ::testing::Test {
mndTransAppendUndoAction(pTrans, &action); mndTransAppendUndoAction(pTrans, &action);
} }
{
void *pRsp = taosMemoryCalloc(1, 256);
strcpy((char *)pRsp, "simple rsponse");
mndTransSetRpcRsp(pTrans, pRsp, 256);
}
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans);
return code;
}
int32_t CreateUserGlobal(const char *acct, const char *user) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
mndTransAppendUndolog(pTrans, pUndoRaw);
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
char *param = strdup("====> test log <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
int32_t code = mndTransPrepare(pMnode, pTrans); int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -189,12 +232,12 @@ TEST_F(MndTestTrans2, 01_Log) { ...@@ -189,12 +232,12 @@ TEST_F(MndTestTrans2, 01_Log) {
ASSERT_NE(pMnode, nullptr); ASSERT_NE(pMnode, nullptr);
EXPECT_EQ(CreateUserLog(acct, user1), 0); EXPECT_EQ(CreateUserLog(acct, user1, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_NE(pUser1, nullptr);
// failed to create user and rollback // failed to create user and rollback
EXPECT_EQ(CreateUserLog(acct_invalid, user2), 0); EXPECT_EQ(CreateUserLog(acct_invalid, user2, TRN_TYPE_CREATE_USER, NULL), 0);
pUser2 = mndAcquireUser(pMnode, user2); pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr); ASSERT_EQ(pUser2, nullptr);
...@@ -214,44 +257,46 @@ TEST_F(MndTestTrans2, 02_Action) { ...@@ -214,44 +257,46 @@ TEST_F(MndTestTrans2, 02_Action) {
ASSERT_NE(pMnode, nullptr); ASSERT_NE(pMnode, nullptr);
// failed to create user and rollback
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
// create user, and fake a response
{ {
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK), 0); // failed to create user and rollback
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
transId = 4; // create user, and fake a response
pTrans = mndAcquireTrans(pMnode, transId); {
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR); EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION); pUser1 = mndAcquireUser(pMnode, user1);
EXPECT_EQ(pTrans->failedTimes, 1); ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action); transId = 4;
pAction->msgSent = 1; pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
SNodeMsg rspMsg = {0}; STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
rspMsg.pNode = pMnode; pAction->msgSent = 1;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1); SNodeMsg rspMsg = {0};
ASSERT_EQ(pUser1, nullptr); rspMsg.pNode = pMnode;
mndReleaseUser(pMnode, pUser1); int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
} }
{ {
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY), 0); EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
...@@ -305,4 +350,164 @@ TEST_F(MndTestTrans2, 02_Action) { ...@@ -305,4 +350,164 @@ TEST_F(MndTestTrans2, 02_Action) {
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
} }
} }
{
EXPECT_EQ(CreateUserAction(acct, user2, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
SUserObj *pUser2 = (SUserObj *)sdbAcquire(pMnode->pSdb, SDB_USER, user2);
ASSERT_NE(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
{
transId = 6;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
rspMsg.rpcMsg.code = 0;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_NE(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
}
{
transId = 6;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 2);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
pAction->msgSent = 1;
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
}
}
} }
TEST_F(MndTestTrans2, 03_Kill) {
const char *acct = "root";
const char *user1 = "kill1";
const char *user2 = "kill2";
SUserObj *pUser1 = NULL;
SUserObj *pUser2 = NULL;
STrans *pTrans = NULL;
int32_t transId = 0;
int32_t action = 0;
ASSERT_NE(pMnode, nullptr);
{
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
transId = 7;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
}
TEST_F(MndTestTrans2, 04_Conflict) {
const char *acct = "root";
const char *user1 = "conflict1";
const char *user2 = "conflict2";
const char *user3 = "conflict3";
const char *user4 = "conflict4";
const char *user5 = "conflict5";
const char *user6 = "conflict6";
const char *user7 = "conflict7";
const char *user8 = "conflict8";
SUserObj *pUser = NULL;
STrans *pTrans = NULL;
int32_t transId = 0;
int32_t code = 0;
ASSERT_NE(pMnode, nullptr);
{
SDbObj dbObj1 = {0};
dbObj1.uid = 9521;
strcpy(dbObj1.name, "db");
SDbObj dbObj2 = {0};
dbObj2.uid = 9522;
strcpy(dbObj2.name, "conflict db");
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &dbObj1), 0);
pUser = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser, nullptr);
mndReleaseUser(pMnode, pUser);
transId = 8;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
// stb scope
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DNODE, NULL), -1);
code = terrno;
EXPECT_EQ(code, TSDB_CODE_MND_TRANS_CONFLICT);
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj2), 0);
EXPECT_EQ(CreateUserLog(acct, user3, TRN_TYPE_CREATE_STB, &dbObj1), 0);
// db scope
pTrans->type = TRN_TYPE_CREATE_DB;
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DNODE, NULL), -1);
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj2), 0);
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj2), 0);
// global scope
pTrans->type = TRN_TYPE_CREATE_DNODE;
EXPECT_EQ(CreateUserLog(acct, user6, TRN_TYPE_CREATE_DNODE, NULL), 0);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj2), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj2), -1);
// global scope
pTrans->type = TRN_TYPE_CREATE_USER;
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), 0);
EXPECT_EQ(CreateUserLog(acct, user8, TRN_TYPE_CREATE_DB, &dbObj2), 0);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
pUser = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser, nullptr);
mndReleaseUser(pMnode, pUser);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册