diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3c42f7b832ef5d3c106c1886a2609677b17767bf..4e93a1d96ee0eb3b70742eee69d892b8cb1dc451 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -174,6 +174,7 @@ typedef struct { void* param; char opername[TSDB_TRANS_OPER_LEN]; SArray* pRpcArray; + SRWLatch lockRpcArray; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index d2fc2dc9b1ffed70f80a1571d22479e36e38d423..b92be19741d72d4b30cd1595a894371005fba023 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -628,6 +628,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); + taosInitRWLatch(&pTrans->lockRpcArray); if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || pTrans->pRpcArray == NULL) { @@ -737,12 +738,14 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c if (pTrans->oper == oper) { if (strcasecmp(dbname, pTrans->dbname) == 0) { mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper); + taosWLockLatch(&pTrans->lockRpcArray); if (pTrans->pRpcArray == NULL) { - pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); + pTrans->pRpcArray = taosArrayInit(4, sizeof(SRpcHandleInfo)); } if (pTrans->pRpcArray != NULL && taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) { code = 0; } + taosWUnLockLatch(&pTrans->lockRpcArray); sdbRelease(pMnode->pSdb, pTrans); break; @@ -944,8 +947,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { pTrans->failedTimes, code); } + taosWLockLatch(&pTrans->lockRpcArray); int32_t size = taosArrayGetSize(pTrans->pRpcArray); - if (size <= 0) return; + if (size <= 0) { + taosWUnLockLatch(&pTrans->lockRpcArray); + return; + } for (int32_t i = 0; i < size; ++i) { SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i); @@ -997,6 +1004,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } } taosArrayClear(pTrans->pRpcArray); + taosWUnLockLatch(&pTrans->lockRpcArray); } int32_t mndTransProcessRsp(SRpcMsg *pRsp) {