未验证 提交 394438d4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12106 from taosdata/feature/dnode

test: add action test for transaction
...@@ -44,6 +44,8 @@ typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); ...@@ -44,6 +44,8 @@ typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
int32_t mndInitTrans(SMnode *pMnode); int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode);
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId);
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans); void mndTransDrop(STrans *pTrans);
......
...@@ -537,7 +537,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { ...@@ -537,7 +537,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
return 0; return 0;
} }
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId); STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) { if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
...@@ -545,7 +545,7 @@ static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { ...@@ -545,7 +545,7 @@ static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
return pTrans; return pTrans;
} }
static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pTrans); sdbRelease(pSdb, pTrans);
} }
...@@ -919,27 +919,41 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { ...@@ -919,27 +919,41 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
if (arraySize == 0) return 0; if (arraySize == 0) return 0;
int32_t code = 0;
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWriteWithoutFree(pSdb, pRaw); if (sdbWriteWithoutFree(pSdb, pRaw) != 0) {
if (code != 0) { code = ((terrno != 0) ? terrno : -1);
return code;
} }
} }
return 0; terrno = code;
return code;
} }
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteLogs(pMnode, pTrans->redoLogs); int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs);
if (code != 0) {
mError("failed to execute redoLogs since %s", terrstr());
}
return code;
} }
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteLogs(pMnode, pTrans->undoLogs); int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs);
if (code != 0) {
mError("failed to execute undoLogs since %s, return success", terrstr());
}
return 0; // return success in any case
} }
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteLogs(pMnode, pTrans->commitLogs); int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
if (code != 0) {
mError("failed to execute commitLogs since %s", terrstr());
}
return code;
} }
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
...@@ -983,6 +997,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -983,6 +997,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
} else { } else {
if (terrno == TSDB_CODE_INVALID_PTR) rpcFreeCont(rpcMsg.pCont);
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1; return -1;
} }
...@@ -1029,11 +1044,19 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -1029,11 +1044,19 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
} }
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
if (code != 0) {
mError("failed to execute redoActions since %s", terrstr());
}
return code;
} }
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
if (code != 0) {
mError("failed to execute undoActions since %s", terrstr());
}
return code;
} }
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
......
...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
int32_t CreateUser(const char *acct, const char *user) { int32_t CreateUserLog(const char *acct, const char *user) {
SUserObj userObj = {0}; SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.user, user, TSDB_USER_LEN);
...@@ -101,7 +101,11 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -101,7 +101,11 @@ class MndTestTrans2 : public ::testing::Test {
mndTransAppendRedolog(pTrans, pRedoRaw); mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
char *param = strdup("====> test param <====="); SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
mndTransAppendUndolog(pTrans, pUndoRaw);
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
char *param = strdup("====> test log <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
int32_t code = mndTransPrepare(pMnode, pTrans); int32_t code = mndTransPrepare(pMnode, pTrans);
...@@ -109,29 +113,196 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -109,29 +113,196 @@ class MndTestTrans2 : public ::testing::Test {
return code; return code;
} }
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, policy, TRN_TYPE_CREATE_USER, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
mndTransAppendUndolog(pTrans, pUndoRaw);
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
char *param = strdup("====> test action <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
{
STransAction action = {0};
action.epSet.inUse = 0;
action.epSet.numOfEps = 1;
action.epSet.eps[0].port = 9040;
strcpy(action.epSet.eps[0].fqdn, "localhost");
int32_t contLen = 1024;
void *pReq = taosMemoryCalloc(1, contLen);
strcpy((char *)pReq, "hello world redo");
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
mndTransAppendRedoAction(pTrans, &action);
}
if (hasUndoAction) {
STransAction action = {0};
action.epSet.inUse = 0;
action.epSet.numOfEps = 1;
action.epSet.eps[0].port = 9040;
strcpy(action.epSet.eps[0].fqdn, "localhost");
int32_t contLen = 1024;
void *pReq = taosMemoryCalloc(1, contLen);
strcpy((char *)pReq, "hello world undo");
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
mndTransAppendUndoAction(pTrans, &action);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans);
return code;
}
}; };
SMnode *MndTestTrans2::pMnode; SMnode *MndTestTrans2::pMnode;
TEST_F(MndTestTrans2, 01_CbFunc) { TEST_F(MndTestTrans2, 01_Log) {
const char *acct = "root"; const char *acct = "root";
const char *acct_invalid = "root1"; const char *acct_invalid = "root1";
const char *user1 = "test1"; const char *user1 = "log1";
const char *user2 = "test2"; const char *user2 = "log2";
SUserObj *pUser1 = NULL; SUserObj *pUser1 = NULL;
SUserObj *pUser2 = NULL; SUserObj *pUser2 = NULL;
ASSERT_NE(pMnode, nullptr); ASSERT_NE(pMnode, nullptr);
// create user success EXPECT_EQ(CreateUserLog(acct, user1), 0);
EXPECT_EQ(CreateUser(acct, user1), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_NE(pUser1, nullptr);
// failed to create user and rollback // failed to create user and rollback
EXPECT_EQ(CreateUser(acct_invalid, user2), 0); EXPECT_EQ(CreateUserLog(acct_invalid, user2), 0);
pUser2 = mndAcquireUser(pMnode, user2); pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr); ASSERT_EQ(pUser2, nullptr);
mndTransPullup(pMnode); mndTransPullup(pMnode);
} }
TEST_F(MndTestTrans2, 02_Action) {
const char *acct = "root";
const char *acct_invalid = "root1";
const char *user1 = "action1";
const char *user2 = "action2";
SUserObj *pUser1 = NULL;
SUserObj *pUser2 = NULL;
STrans *pTrans = NULL;
int32_t transId = 0;
int32_t action = 0;
ASSERT_NE(pMnode, nullptr);
// failed to create user and rollback
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
// create user, and fake a response
{
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
transId = 4;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
pAction->msgSent = 1;
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
{
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
{
transId = 5;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_REDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->redoActions, action);
pAction->msgSent = 1;
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
rspMsg.rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
{
transId = 5;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
EXPECT_EQ(pTrans->stage, TRN_STAGE_REDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 2);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->redoActions, action);
pAction->msgSent = 1;
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册