未验证 提交 f35bdc0c 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12816 from taosdata/feature/stream

fix(tmq): memory leak
......@@ -171,6 +171,7 @@ tmq_t* build_consumer() {
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
......
......@@ -2523,11 +2523,9 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
......@@ -2540,14 +2538,20 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
} else {
pRsp->blockSchema = NULL;
}
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
} else {
pRsp->blockTbName = NULL;
}
}
}
......
......@@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
......@@ -313,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
SConfig * pCfg = taosGetCfg();
SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
switch (option) {
......
......@@ -390,7 +390,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) {
taosArrayDestroy((SArray *)res);
taosArrayDestroy((SArray*)res);
}
}
......
......@@ -146,10 +146,10 @@ void taos_free_result(TAOS_RES *res) {
SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema);
if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName);
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
}
......
......@@ -202,7 +202,12 @@ tmq_conf_t* tmq_conf_new() {
}
void tmq_conf_destroy(tmq_conf_t* conf) {
if (conf) taosMemoryFree(conf);
if (conf) {
if (conf->ip) taosMemoryFree(conf->ip);
if (conf->user) taosMemoryFree(conf->user);
if (conf->pass) taosMemoryFree(conf->pass);
taosMemoryFree(conf);
}
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
......@@ -497,6 +502,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
} else {
ASSERT(0);
}
taosFreeQitem(pTaskType);
}
taosFreeQall(qall);
return 0;
......@@ -954,8 +960,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = pParam->tmq;
int32_t vgId = pParam->vgId;
int32_t epoch = pParam->epoch;
taosMemoryFree(pParam);
if (code != 0) {
tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFree(pMsg->pData);
goto CREATE_MSG_FAIL;
}
......@@ -963,19 +973,21 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
// do not write into queue since updating epoch reset
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
tmqEpoch);
tsem_post(&tmq->rspSem);
taosMemoryFree(pMsg->pData);
return 0;
}
if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
taosMemoryFree(pMsg->pData);
tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL;
}
......@@ -986,6 +998,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
......@@ -995,7 +1008,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) {
if (epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
tsem_post(&tmq->rspSem);
......@@ -1088,6 +1101,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
int8_t async = pParam->async;
pParam->code = code;
if (code != 0) {
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
......@@ -1104,7 +1118,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto END;
}
if (!pParam->async) {
if (!async) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
......@@ -1125,13 +1139,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
taosWriteQitem(tmq->mqueue, pWrapper);
tsem_post(&tmq->rspSem);
taosMemoryFree(pParam);
}
END:
/*atomic_store_8(&tmq->epStatus, 0);*/
if (!pParam->async) {
if (!async) {
tsem_post(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
}
return code;
}
......@@ -1279,7 +1294,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
taosFreeQitem(pWrapper);
return pRspObj;
}
......@@ -1401,6 +1415,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
}
// build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
/*printf("epoch mismatch\n");*/
......
......@@ -57,6 +57,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
void tqClose(STQ* pTq) {
if (pTq) {
taosMemoryFreeClear(pTq->path);
taosHashCleanup(pTq->execs);
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
taosMemoryFree(pTq);
}
// TODO
......@@ -409,9 +412,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.reader = pReadHandle,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
};
pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
......@@ -1000,10 +1003,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册