From a9d13928dde02f233b26c9b7ce1340248f8245e6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 03:59:42 -0800 Subject: [PATCH] fix bug in transaction --- source/dnode/mgmt/impl/src/dndMnode.c | 6 +++++- source/dnode/mgmt/impl/src/dndTransport.c | 4 ++-- source/dnode/mgmt/impl/test/mnode/mnode.cpp | 22 +++++++++++++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 5 ----- source/dnode/mnode/impl/src/mndDb.c | 8 ++++---- source/dnode/mnode/impl/src/mndMnode.c | 6 ++++-- source/dnode/mnode/impl/src/mndTrans.c | 5 +++-- source/util/src/tworker.c | 2 ++ 8 files changed, 42 insertions(+), 16 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 4fd6b021c8..c4d69c4626 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) { return -1; } + dndReleaseMnode(pDnode, pMnode); dndStopMnodeWorker(pDnode); dndWriteMnodeFile(pDnode); mndClose(pMnode); + pMgmt->pMnode = NULL; mndDestroy(pDnode->dir.mnode); return 0; @@ -661,6 +663,8 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { rpcFreeCont(pRpcMsg->pCont); taosFreeQitem(pMsg); } + + dndReleaseMnode(pDnode, pMnode); } void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -945,7 +949,7 @@ void dndCleanupMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; dInfo("dnode-mnode start to clean up"); - dndStopMnodeWorker(pDnode); + if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode); dndFreeMnodeMgmtQueue(pDnode); tfree(pMgmt->file); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index be60d2fcae..69db619aaa 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -136,7 +136,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { (*fp)(pDnode, pMsg, pEpSet); - dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); + dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); } else { dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); rpcFreeCont(pMsg->pCont); @@ -184,7 +184,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); + dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); dndProcessDnodeReq(pDnode, pMsg, pEpSet); return; } diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp index 6f66a0b12f..6724c85500 100644 --- a/source/dnode/mgmt/impl/test/mnode/mnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -140,6 +140,28 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { CheckTimestamp(); CheckTimestamp(); } + + { + // drop mnode + int32_t contLen = sizeof(SDropMnodeMsg); + + SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckInt16(1); + CheckBinary("localhost:9061", TSDB_EP_LEN); + CheckBinary("master", 12); + CheckInt64(0); + CheckTimestamp(); + } } // { // int32_t contLen = sizeof(SDropDnodeMsg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index da54920574..a0404d5bcc 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -290,11 +290,6 @@ typedef struct SMnodeMsg { char db[TSDB_FULL_DB_NAME_LEN]; int32_t acctId; SMnode *pMnode; - int16_t received; - int16_t successed; - int16_t expected; - int16_t retry; - int32_t code; int64_t createdTime; SRpcMsg rpcMsg; int32_t contLen; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 108896c121..dd474d85f3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); if (pDb == NULL) { - mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); + mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr()); return -1; } @@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); if (pDb == NULL) { - mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); + mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 37c9ac513a..a019c0dc55 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -440,6 +440,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode sdbRelease(pSdb, pMObj); } + alterMsg.replica = numOfReplicas; + while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -506,12 +508,12 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { + if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); goto DROP_MNODE_OVER; } - if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto DROP_MNODE_OVER; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 504717117b..53e3bc5203 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -625,7 +625,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { pAction->errCode = pMsg->rpcMsg.code; } - mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); + mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code); mndTransExecute(pMnode, pTrans); HANDLE_ACTION_RSP_OVER: @@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent) continue; + if (pAction->msgReceived && pAction->errCode == 0) continue; int64_t signature = pTrans->id; signature = (signature << 32); @@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA terrno = errorCode; return errorCode; } else { + mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceivedMsgs, numOfActions, errorCode); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 11972e84cb..fb7b71b845 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) { void tWorkerCleanup(SWorkerPool *pool) { for (int i = 0; i < pool->max; ++i) { SWorker *worker = pool->workers + i; + if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } @@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) { for (int i = 0; i < pool->max; ++i) { SWorker *worker = pool->workers + i; + if (worker == NULL) continue; if (taosCheckPthreadValid(worker->thread)) { pthread_join(worker->thread, NULL); } -- GitLab