diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 4d05637a2b3ab872d5a0ba99c022e21f79ca9b05..67675b5400b9412c923ed4f9971fb3d2bda8da2c 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -77,7 +77,7 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) { if (pIter == NULL) break; *ppIter = pIter; - + sdbCancelFetch(pSdb, pIter); return pCluster; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index bb92bfb4c7607aab971d576252f3cfc295aef2f5..47029c8df135ca2c7e299dcbbb6194d7ad8809b0 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -706,6 +706,7 @@ _OVER: } else { mndReleaseDnode(pMnode, pDnode); } + sdbCancelFetch(pSdb, pIter); mndTransDrop(pTrans); sdbFreeRaw(pRaw); return terrno; diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index 2d2637b8ce2e334a40e644bb846c726da6c19879..8f977dacb73bfe3e90d7ea0423b1f84fbd63bcee 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -831,6 +831,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) { memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj)); sdbRelease(pSdb, pIdx); + sdbCancelFetch(pSdb, pIter); return 0; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 381b1e64ed97080a38b3e45e53fe74c18ea3dc15..55cca5a30c03b82152aff0e4d0a08b3c55085a66 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -199,6 +199,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) pGid->syncCanRead = 0; roleChanged = true; } + sdbCancelFetch(pSdb, pIter); break; } } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9a611fe46af1494bdcb9f77530452512a0d978a2..b95f4d6a007b69e5baf169bb24b2ff0c8a6d790b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -168,6 +168,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { void* pIter = NULL; // TODO random fetch pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj); + sdbCancelFetch(pMnode->pSdb, pIter); return pObj; } @@ -435,6 +436,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); + sdbCancelFetch(pSdb, pIter); return -1; } @@ -444,6 +446,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); + sdbCancelFetch(pSdb, pIter); return -1; } @@ -453,6 +456,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { if (code != TSDB_CODE_SUCCESS) { terrno = code; qDestroyQueryPlan(pPlan); + sdbCancelFetch(pSdb, pIter); return -1; } } @@ -492,6 +496,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { if (code != TSDB_CODE_SUCCESS) { qDestroyQueryPlan(pPlan); + sdbCancelFetch(pSdb, pIter); return -1; } } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6969e4387f118e621431136d979166c25cce8803..c1186d068f8f84aa34e8f556ba0b8a1373d487d8 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -900,7 +900,6 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) { - sdbCancelFetch(pSdb, pVgroup); sdbRelease(pSdb, pVgroup); continue; } @@ -1289,6 +1288,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; mError("stream:%s, create ast error", pStream->name); sdbRelease(pSdb, pStream); + sdbCancelFetch(pSdb, pIter); return -1; } @@ -1308,6 +1308,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName nodesDestroyNode(pAst); nodesDestroyList(pNodeList); sdbRelease(pSdb, pStream); + sdbCancelFetch(pSdb, pIter); return -1; } mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId); @@ -1337,6 +1338,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", pSma->name, stbFullName, suid, colId); + sdbCancelFetch(pSdb, pIter); return -1; } @@ -1357,6 +1359,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, nodesDestroyNode(pAst); nodesDestroyList(pNodeList); sdbRelease(pSdb, pSma); + sdbCancelFetch(pSdb, pIter); return -1; } mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 63f49cfe2be3a17ad99db4fa578c60eea6f7dc48..028c482e6c26997162631b0a25be0aed2d0cf5c1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -705,12 +705,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + sdbCancelFetch(pMnode->pSdb, pIter); goto _OVER; } if (pStream->targetStbUid == streamObj.targetStbUid) { mError("Cannot write the same stable as other stream:%s", pStream->name); terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; + sdbCancelFetch(pMnode->pSdb, pIter); goto _OVER; } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b2235c8b503d503d63a03c066931d4cd7e81c356..f51a61eda38dc3d1d44d4b6ac251dcbf106f77eb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1104,6 +1104,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) if (taosHashGetSize(pSub->consumerHash) != 0) { sdbRelease(pSdb, pSub); terrno = TSDB_CODE_MND_IN_REBALANCE; + sdbCancelFetch(pSdb, pIter); return -1; } int32_t sz = taosArrayGetSize(pSub->unassignedVgs); @@ -1122,12 +1123,14 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); return -1; } } if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); goto END; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index dbcf1fa2ad288f221bb5d51ad6eab0d394c9775e..85e6f1caf6007ca53da9b08c2ad07b8147121516 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -513,6 +513,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); goto _OUT; } void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); @@ -522,6 +523,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); goto _OUT; } tEncoderClear(&encoder); @@ -535,6 +537,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); goto _OUT; } buf = NULL; @@ -697,6 +700,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); mndReleaseTopic(pMnode, pTopic); + sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); @@ -710,6 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); mndReleaseTopic(pMnode, pTopic); + sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); @@ -723,6 +728,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); mndReleaseTopic(pMnode, pTopic); + sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); @@ -789,6 +795,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); mndReleaseTopic(pMnode, pTopic); + sdbCancelFetch(pSdb, pIter); mndTransDrop(pTrans); return -1; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 1fc2e42b8ca9be737adfb60e8312c390dbf3f5b7..d3b2ef7344e45937a37d121ecc5af0b7901b3e17 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -1444,7 +1444,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { if (pIter == NULL) break; code = -1; - if (mndUserDupObj(pUser, &newUser) != 0) break; + if (mndUserDupObj(pUser, &newUser) != 0) { + sdbCancelFetch(pSdb, pIter); + break; + } bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL); bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL); @@ -1453,7 +1456,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { (void)taosHashRemove(newUser.writeDbs, db, len); SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbCancelFetch(pSdb, pIter); + break; + } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); } @@ -1484,6 +1490,7 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { code = -1; if (mndUserDupObj(pUser, &newUser) != 0) { + sdbCancelFetch(pSdb, pIter); break; } @@ -1491,7 +1498,10 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { if (inTopic) { (void)taosHashRemove(newUser.topics, topic, len); SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbCancelFetch(pSdb, pIter); + break; + } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a82e49f397abe38164e67f89f214c822ae5afdaa..8b313695a195169023a06a9b944bcaafa5751f0a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2591,6 +2591,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode); if (pIter == NULL) break; if (!mndIsDnodeOnline(pDnode, curMs)) { + sdbCancelFetch(pMnode->pSdb, pIter); terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE; mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id); sdbRelease(pMnode->pSdb, pDnode);