From 0322fdc1fc9a54e92b2933e52de570a6337d471b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 24 Jun 2023 23:43:43 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/common/src/tdatablock.c | 3 +- source/libs/stream/src/streamDispatch.c | 47 +++++++++++++++++-------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cc49716644..dc6e0d2cb7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1185,8 +1185,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { colDataDestroy(pColInfoData); } - taosArrayDestroy(pBlock->pDataBlock); - pBlock->pDataBlock = NULL; + pBlock->pDataBlock = taosArrayDestroy(pBlock->pDataBlock); taosMemoryFreeClear(pBlock->pBlockAgg); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7ce85b4e65..c42320ad13 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -16,7 +16,9 @@ #include "ttimer.h" #include "streamInc.h" -#define MAX_BLOCK_NAME_NUM 1024 +#define MAX_BLOCK_NAME_NUM 1024 +#define DISPATCH_RETRY_INTERVAL_MS 300 +#define MAX_CONTINUE_RETRY_COUNT 5 typedef struct SBlockName { uint32_t hashValue; @@ -324,7 +326,10 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in // serialize int32_t tlen; tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code); - if (code < 0) goto FAIL; + if (code < 0) { + goto FAIL; + } + code = -1; buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { @@ -346,13 +351,13 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in msg.msgType = pTask->msgInfo.msgType; qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); - tmsgSendReq(pEpSet, &msg); - - code = 0; - return 0; + return tmsgSendReq(pEpSet, &msg); FAIL: - if (buf) rpcFreeCont(buf); + if (buf) { + rpcFreeCont(buf); + } + return code; } @@ -403,13 +408,16 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S for (j = 0; j < vgSz; j++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); ASSERT(pVgInfo->vgId > 0); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { return -1; } + if (pReqs[j].blockNum == 0) { atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); } + pReqs[j].blockNum++; found = true; break; @@ -510,7 +518,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); - if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); + if (code < 0) { goto FAIL_SHUFFLE_DISPATCH; } } @@ -536,7 +545,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { - streamRetryDispatchStreamBlock(pTask, 300); + qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); + atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } @@ -584,12 +595,20 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, - tstrerror(code), pTask->outputStatus, retryCount); + tstrerror(terrno), pTask->outputStatus, retryCount); + + // todo deal with only partially success dispatch case + atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); + if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + return code; + } - if (++retryCount > 5) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, retry in %dms", pTask->id.idStr, - retryCount, tstrerror(code), 300); - streamRetryDispatchStreamBlock(pTask, 300); + if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr, + retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } } -- GitLab