提交 5b8cc223 编写于 作者: L Liu Jicong

fix: mem leak

上级 1e51c1b4
...@@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) { ...@@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) {
destroyRequest(pRequest); destroyRequest(pRequest);
} else if (TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
// taosx // taosx
taosArrayDestroy(pRsp->rsp.createTableLen); taosArrayDestroy(pRsp->rsp.createTableLen);
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
...@@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) { ...@@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) {
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res; SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo); doFreeReqResultInfo(&pRsp->resInfo);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
......
...@@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { ...@@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
return 0; return 0;
} }
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
// do nothing
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
// taosx
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
}
}
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* msg = NULL; SMqRspWrapper* rspWrapper = NULL;
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (msg) if (rspWrapper) {
taosFreeQitem(msg); tmqFreeRspWrapper(rspWrapper);
else taosFreeQitem(rspWrapper);
} else {
break; break;
}
} }
msg = NULL; rspWrapper = NULL;
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (msg) if (rspWrapper) {
taosFreeQitem(msg); tmqFreeRspWrapper(rspWrapper);
else taosFreeQitem(rspWrapper);
} else {
break; break;
}
} }
} }
...@@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) ...@@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
tDeleteSMqAskEpRsp(rspMsg); tDeleteSMqAskEpRsp(rspMsg);
*pReset = true; *pReset = true;
} else { } else {
tmqFreeRspWrapper(rspWrapper);
*pReset = false; *pReset = false;
} }
} else { } else {
...@@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch); pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
...@@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
...@@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
...@@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) { if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("return since poll err"); tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
/*return NULL;*/ /*return NULL;*/
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册