未验证 提交 016f7642 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18854 from taosdata/feature/stream

fix: mem leak
...@@ -33,16 +33,16 @@ extern "C" { ...@@ -33,16 +33,16 @@ extern "C" {
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
// clang-format on // clang-format on
#define WAL_PROTO_VER 0 #define WAL_PROTO_VER 0
#define WAL_NOSUFFIX_LEN 20 #define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log" #define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx" #define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum { typedef enum {
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
......
...@@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) { ...@@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) {
destroyRequest(pRequest); destroyRequest(pRequest);
} else if (TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
// taosx // taosx
taosArrayDestroy(pRsp->rsp.createTableLen); taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
...@@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) { ...@@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) {
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res; SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo); doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
......
...@@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { ...@@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
return 0; return 0;
} }
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
// do nothing
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
}
}
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* msg = NULL; SMqRspWrapper* rspWrapper = NULL;
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (msg) if (rspWrapper) {
taosFreeQitem(msg); tmqFreeRspWrapper(rspWrapper);
else taosFreeQitem(rspWrapper);
} else {
break; break;
}
} }
msg = NULL; rspWrapper = NULL;
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (msg) if (rspWrapper) {
taosFreeQitem(msg); tmqFreeRspWrapper(rspWrapper);
else taosFreeQitem(rspWrapper);
} else {
break; break;
}
} }
} }
...@@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) ...@@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
tDeleteSMqAskEpRsp(rspMsg); tDeleteSMqAskEpRsp(rspMsg);
*pReset = true; *pReset = true;
} else { } else {
tmqFreeRspWrapper(rspWrapper);
*pReset = false; *pReset = false;
} }
} else { } else {
...@@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch); pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
...@@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
...@@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
...@@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) { if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("return since poll err"); tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
/*return NULL;*/ /*return NULL;*/
} }
......
...@@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
} }
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (pHandle) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
}
} }
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
...@@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
} }
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
ASSERT(0); tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
} }
return 0; return 0;
} }
......
...@@ -223,6 +223,7 @@ void walClose(SWal *pWal) { ...@@ -223,6 +223,7 @@ void walClose(SWal *pWal) {
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
taosHashCleanup(pWal->pRefHash); taosHashCleanup(pWal->pRefHash);
pWal->pRefHash = NULL;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
......
...@@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) { ...@@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) {
return pRef; return pRef;
} }
#if 1
void walCloseRef(SWal *pWal, int64_t refId) { void walCloseRef(SWal *pWal, int64_t refId) {
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
if (ppRef == NULL) return; if (ppRef == NULL) return;
SWalRef *pRef = *ppRef; SWalRef *pRef = *ppRef;
if (pRef) {
wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
} else {
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
}
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
#endif
int32_t walRefVer(SWalRef *pRef, int64_t ver) { int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal; SWal *pWal = pRef->pWal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册