提交 da9e3043 编写于 作者: wmmhello's avatar wmmhello

fix:add sdbFetchCancel to release hash node

上级 8d8e12ee
...@@ -77,7 +77,7 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) { ...@@ -77,7 +77,7 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) {
if (pIter == NULL) break; if (pIter == NULL) break;
*ppIter = pIter; *ppIter = pIter;
sdbCancelFetch(pSdb, pIter);
return pCluster; return pCluster;
} }
......
...@@ -706,6 +706,7 @@ _OVER: ...@@ -706,6 +706,7 @@ _OVER:
} else { } else {
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
} }
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return terrno; return terrno;
......
...@@ -831,6 +831,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb ...@@ -831,6 +831,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb
if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) { if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) {
memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj)); memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj));
sdbRelease(pSdb, pIdx); sdbRelease(pSdb, pIdx);
sdbCancelFetch(pSdb, pIter);
return 0; return 0;
} }
......
...@@ -199,6 +199,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) ...@@ -199,6 +199,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pGid->syncCanRead = 0; pGid->syncCanRead = 0;
roleChanged = true; roleChanged = true;
} }
sdbCancelFetch(pSdb, pIter);
break; break;
} }
} }
......
...@@ -168,6 +168,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { ...@@ -168,6 +168,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
void* pIter = NULL; void* pIter = NULL;
// TODO random fetch // TODO random fetch
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj); pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return pObj; return pObj;
} }
...@@ -435,6 +436,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -435,6 +436,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
...@@ -444,6 +446,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -444,6 +446,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
...@@ -453,6 +456,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -453,6 +456,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
} }
...@@ -492,6 +496,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -492,6 +496,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
} }
......
...@@ -900,7 +900,6 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { ...@@ -900,7 +900,6 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SMsgHead *pHead = rpcMallocCont(contLen); SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) { if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -1289,6 +1288,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName ...@@ -1289,6 +1288,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
mError("stream:%s, create ast error", pStream->name); mError("stream:%s, create ast error", pStream->name);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
...@@ -1308,6 +1308,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName ...@@ -1308,6 +1308,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList); nodesDestroyList(pNodeList);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId); mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
...@@ -1337,6 +1338,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, ...@@ -1337,6 +1338,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
pSma->name, stbFullName, suid, colId); pSma->name, stbFullName, suid, colId);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
...@@ -1357,6 +1359,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, ...@@ -1357,6 +1359,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList); nodesDestroyList(pNodeList);
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
......
...@@ -705,12 +705,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -705,12 +705,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (numOfStream > MND_STREAM_MAX_NUM) { if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER; goto _OVER;
} }
if (pStream->targetStbUid == streamObj.targetStbUid) { if (pStream->targetStbUid == streamObj.targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name); mError("Cannot write the same stable as other stream:%s", pStream->name);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
sdbCancelFetch(pMnode->pSdb, pIter);
goto _OVER; goto _OVER;
} }
} }
......
...@@ -1104,6 +1104,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) ...@@ -1104,6 +1104,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
if (taosHashGetSize(pSub->consumerHash) != 0) { if (taosHashGetSize(pSub->consumerHash) != 0) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
terrno = TSDB_CODE_MND_IN_REBALANCE; terrno = TSDB_CODE_MND_IN_REBALANCE;
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
...@@ -1122,12 +1123,14 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) ...@@ -1122,12 +1123,14 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
} }
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
goto END; goto END;
} }
......
...@@ -513,6 +513,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -513,6 +513,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) { if (code < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT; goto _OUT;
} }
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
...@@ -522,6 +523,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -522,6 +523,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT; goto _OUT;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
...@@ -535,6 +537,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -535,6 +537,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT; goto _OUT;
} }
buf = NULL; buf = NULL;
...@@ -697,6 +700,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -697,6 +700,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) { if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup); dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
...@@ -710,6 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -710,6 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) { if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup); dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
...@@ -723,6 +728,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -723,6 +728,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (strcmp(name, pTopic->name) == 0) { if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup); dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
...@@ -789,6 +795,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -789,6 +795,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
......
...@@ -1444,7 +1444,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { ...@@ -1444,7 +1444,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
if (pIter == NULL) break; if (pIter == NULL) break;
code = -1; code = -1;
if (mndUserDupObj(pUser, &newUser) != 0) break; if (mndUserDupObj(pUser, &newUser) != 0) {
sdbCancelFetch(pSdb, pIter);
break;
}
bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL); bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL);
bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL); bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL);
...@@ -1453,7 +1456,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { ...@@ -1453,7 +1456,10 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
(void)taosHashRemove(newUser.writeDbs, db, len); (void)taosHashRemove(newUser.writeDbs, db, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbCancelFetch(pSdb, pIter);
break;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
} }
...@@ -1484,6 +1490,7 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { ...@@ -1484,6 +1490,7 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
code = -1; code = -1;
if (mndUserDupObj(pUser, &newUser) != 0) { if (mndUserDupObj(pUser, &newUser) != 0) {
sdbCancelFetch(pSdb, pIter);
break; break;
} }
...@@ -1491,7 +1498,10 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { ...@@ -1491,7 +1498,10 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
if (inTopic) { if (inTopic) {
(void)taosHashRemove(newUser.topics, topic, len); (void)taosHashRemove(newUser.topics, topic, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser); SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break; if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbCancelFetch(pSdb, pIter);
break;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
} }
......
...@@ -2591,6 +2591,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { ...@@ -2591,6 +2591,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode); pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break; if (pIter == NULL) break;
if (!mndIsDnodeOnline(pDnode, curMs)) { if (!mndIsDnodeOnline(pDnode, curMs)) {
sdbCancelFetch(pMnode->pSdb, pIter);
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE; terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id); mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
sdbRelease(pMnode->pSdb, pDnode); sdbRelease(pMnode->pSdb, pDnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册