未验证 提交 ea0b090c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18147 from taosdata/feature/stream

fix: memory leak
......@@ -2627,6 +2627,7 @@ typedef struct {
int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
void tDeleteSTqCheckInfo(STqCheckInfo* pInfo);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
......
......@@ -5903,6 +5903,7 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
}
return 0;
}
void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); }
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);
......
......@@ -235,6 +235,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed
taosArrayPush(pOutput->removedConsumers, &consumerId);
......
......@@ -88,8 +88,7 @@ typedef struct {
STqExecTb execTb;
STqExecDb execDb;
};
int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
int32_t numOfCols; // number of out pout column, temporarily used
} STqExecHandle;
typedef struct {
......
......@@ -55,6 +55,7 @@ static void destroySTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
tqCloseReader(pData->execHandle.pExecReader);
walCloseReader(pData->pWalReader);
......@@ -80,7 +81,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pVnode = pVnode;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
taosInitRWLatch(&pTq->pushLock);
......@@ -88,6 +88,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
if (tqMetaOpen(pTq) < 0) {
ASSERT(0);
......@@ -779,6 +780,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
}
if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
taosMemoryFree(req.qmsg);
return 0;
}
STqHandle tqHandle = {0};
......@@ -815,8 +817,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
req.qmsg = NULL;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
ASSERT(pHandle->execHandle.task);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
......@@ -864,6 +865,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO
ASSERT(0);
......
......@@ -305,8 +305,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
};
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
handle.execHandle.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
ASSERT(handle.execHandle.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner);
......
......@@ -246,7 +246,9 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
}
}
*pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
if (pSchema) {
*pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
}
return pTaskInfo;
}
......@@ -659,7 +661,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
return pTaskInfo->code;
}
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
......@@ -680,7 +682,7 @@ int32_t stopInfoComp(void const* lp, void const* rp) {
return 0;
}
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
if (idx >= 0) {
......@@ -696,8 +698,8 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
for (int32_t i = 0; i < num; ++i) {
SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
if (pExchangeInfo) {
tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pStop->refId);
......@@ -715,11 +717,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
}
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo);
qStopTaskOperators(pTaskInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -1178,4 +1180,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
}
\ No newline at end of file
}
......@@ -203,6 +203,14 @@ void walClose(SWal *pWal) {
pWal->pIdxFile = NULL;
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter;
taosMemoryFree(pRef);
}
taosHashCleanup(pWal->pRefHash);
taosThreadMutexUnlock(&pWal->mutex);
......
......@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter;
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer != -1 && pRef->refVer <= ver) {
taosHashCancelIterate(pWal->pRefHash, pIter);
taosThreadMutexUnlock(&pWal->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册