From 3464979489d1d653f2a6edcf2f1e0c26c548bf9e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 2 Aug 2023 17:33:45 +0800 Subject: [PATCH] set dummy delete data request --- source/libs/executor/src/timewindowoperator.c | 5 ++++- source/libs/stream/src/streamExec.c | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3a5ff91f68..96734b27c3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3574,6 +3574,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + qDebug("===stream=== stream session agg"); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { @@ -3736,6 +3737,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { + qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(winInfo, pInfo->pStUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -3754,7 +3756,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); - } + } } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -3863,6 +3865,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + qDebug("===stream=== stream session semi agg"); if (pOperator->status == OP_EXEC_DONE) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 34370ebce9..d3ab7e820a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -364,6 +364,15 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); + if (taosQueueEmpty(pTask->inputQueue->queue)) { + SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);; + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + pDelBlock->info.rows = 0; + pDelBlock->info.version = 0; + pItem->type = STREAM_INPUT__REF_DATA_BLOCK; + pItem->pBlock = pDelBlock; + tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pItem); + } streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); -- GitLab