diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5ad5aa549d28a6b8c4835177dcb11df5418fe57c..149b1a84478bf73e96f4d2f504da60731e800891 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } + } else { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 53e49a6ba5e0be669329c5ff6414c3648ee14fb3..cf72533b31a044fffcca1bd1fab315408f09d52a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { tdbFree(pKey); tdbFree(pVal); + tdbTbcClose(pCur); return -1; } if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); + tdbTbcClose(pCur); return -1; } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ce5917de296c317f739e79cb78cda21660769aa8..530493819591e2ebe6c87d2190b14937b57e2ba2 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { for (int32_t i = 0; i < epSz; i++) { SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); if (pInfo == NULL) return -1; - if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1; + if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { + taosMemoryFreeClear(pInfo); + return -1; + } taosArrayPush(pTask->childEpInfo, &pInfo); }