diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9d0eebe2bf3308eaf374691f5a9923c1035cbb9..d561d864729b4a39ce32b45a9a55f8c76d52c842 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,7 +297,6 @@ typedef struct SPartitionBySupporter { typedef struct SPartitionDataInfo { uint64_t groupId; char* tbname; - SArray* tags; SArray* rowIds; } SPartitionDataInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 280edf8b53d89ef05c7cd4f08060a51312754ca7..e2ba2a07c8943b0af26bc6b4a7cec3d31538bffe 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { return pBlock; } +void freePartItem(void* ptr) { + SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr; + taosArrayDestroy(pPart->rowIds); +} + SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c2e32f1027d9b1e8c65d91fb94dd848fd385559b..95d9dda0578b22d05140175ad39a3327ddbe1d62 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -203,6 +203,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosFreeQitem(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; }