diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e90e974318205c9d2031933075bb03966b37af3f..ff7d7a0963f5a4bae459466d274b49a647e99005 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -119,6 +119,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; taosArrayPush(pChkpoint->blocks, pBlock); + taosMemoryFree(pBlock); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; @@ -163,6 +164,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set the task status pTask->checkpointingId = checkpointId; + + // set task status pTask->status.taskStatus = TASK_STATUS__CK; //todo fix race condition: set the status and append checkpoint block @@ -211,6 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc code = continueDispatchCheckpointBlock(pBlock, pTask); } + streamFreeQitem((SStreamQueueItem*)pBlock); return code; }