diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 53ffd8698fd4bdf350cb9b437bfc726e19367f4f..c5b18bcde0649e94ef1a8ef815dda8e285591776 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); @@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { @@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } int32_t contLen = 0; void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 840326d3183bd889ef71368bd6c32dee9dc20e74..ff64a3cdd5972f05e4e99e9e3ab667d6a996a0ba 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId != -1); - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); void *buf; int32_t tlen; @@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum } static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - void *buf; int32_t tlen; if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { return -1; } + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; @@ -714,7 +715,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pTopic->dbUid) continue; + if (pVgroup->dbUid != pTopic->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } pSub->vgNum++; plan->execNode.nodeId = pVgroup->vgId; @@ -748,7 +752,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, @@ -776,6 +779,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf;