提交 e1046c7a 编写于 作者: L Liu Jicong

fix: memory leak

上级 7dfc465c
...@@ -65,7 +65,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); ...@@ -65,7 +65,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @return * @return
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid); SSchemaWrapper** pSchemaWrapper);
/** /**
* Set the input data block for the stream scan. * Set the input data block for the stream scan.
......
...@@ -110,9 +110,6 @@ typedef struct { ...@@ -110,9 +110,6 @@ typedef struct {
// exec // exec
STqExecHandle execHandle; STqExecHandle execHandle;
// prevent drop
int64_t ntbUid;
SArray* colIdList; // SArray<int32_t>
} STqHandle; } STqHandle;
struct STQ { struct STQ {
......
...@@ -272,6 +272,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -272,6 +272,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = 0; int32_t code = 0;
STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
SWalCkHead* pCkHead = NULL;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
...@@ -461,6 +462,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -461,6 +462,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
OVER: OVER:
if (pCkHead) taosMemoryFree(pCkHead);
// TODO wrap in destroy func // TODO wrap in destroy func
taosArrayDestroy(dataRsp.blockDataLen); taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
...@@ -542,7 +544,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -542,7 +544,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}; };
pHandle->execHandle.execCol.task = pHandle->execHandle.execCol.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper, &pHandle->ntbUid); &pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task); ASSERT(pHandle->execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);
......
...@@ -201,10 +201,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -201,10 +201,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
blockDataFreeRes(&block);
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
...@@ -220,10 +222,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -220,10 +222,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->msgIter.uid; int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
blockDataFreeRes(&block);
continue; continue;
} }
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
......
...@@ -89,9 +89,8 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -89,9 +89,8 @@ int32_t tqMetaOpen(STQ* pTq) {
.version = handle.snapshotVer, .version = handle.snapshotVer,
}; };
handle.execHandle.execCol.task = handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
&handle.execHandle.pSchemaWrapper, &handle.ntbUid);
ASSERT(handle.execHandle.execCol.task); ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
......
...@@ -340,29 +340,30 @@ FAIL: ...@@ -340,29 +340,30 @@ FAIL:
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) { int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pHandle->tbIdHash) { if (pReader->tbIdHash) {
taosHashClear(pHandle->tbIdHash); taosHashClear(pReader->tbIdHash);
} else {
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
} }
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pReader->tbIdHash == NULL) {
if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
} }
return 0; return 0;
} }
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) { int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
if (pHandle->tbIdHash == NULL) { if (pReader->tbIdHash == NULL) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pReader->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -370,18 +371,18 @@ int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) { ...@@ -370,18 +371,18 @@ int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
} }
return 0; return 0;
} }
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) { int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
ASSERT(pHandle->tbIdHash != NULL); ASSERT(pReader->tbIdHash != NULL);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t)); taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
} }
return 0; return 0;
......
...@@ -157,7 +157,6 @@ typedef struct { ...@@ -157,7 +157,6 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer; int64_t recoverStartVer;
int64_t recoverEndVer; int64_t recoverEndVer;
int64_t ntbUid;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct { typedef struct {
......
...@@ -121,7 +121,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -121,7 +121,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid) { SSchemaWrapper** pSchemaWrapper) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // TODO create raw scan
return NULL; return NULL;
...@@ -156,7 +156,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -156,7 +156,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
} }
*pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
*ntbUid = ((SExecTaskInfo*)pTaskInfo)->streamInfo.ntbUid;
return pTaskInfo; return pTaskInfo;
} }
......
...@@ -4165,9 +4165,6 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, ...@@ -4165,9 +4165,6 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else { } else {
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pTaskInfo->streamInfo.ntbUid = mr.me.uid;
}
} }
metaReaderClear(&mr); metaReaderClear(&mr);
...@@ -4443,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4443,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){ STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark, .waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType, .calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
}; };
if (pHandle->vnode) { if (pHandle->vnode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册