diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 73aea04f92941161870dd0075dc0a3904adc21cf..ae05f92ab54f87f6f33f927564fde20ece7962f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1771,6 +1771,7 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { void* pBuf = NULL; int32_t len = streamScanOperatorEncode(pInfo, &pBuf); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); + taosMemoryFree(pBuf); } // other properties are recovered from the execution plan diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 55f0920d2b77b5c605836009e8657a2b8debbdef..a0bf9d052a1386f7429d94bb7778ac1a1d590a9c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2673,6 +2673,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { len = doStreamIntervalEncodeOpState(&pBuf, pOperator); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); } static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { @@ -4675,6 +4676,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamSessionSaveCheckpoint(pOperator); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); }