未验证 提交 11ec00ab 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22296 from taosdata/fix/TD-25532

set dummy delete data request
......@@ -3574,6 +3574,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
......@@ -3736,6 +3737,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -3754,7 +3756,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
......@@ -3863,6 +3865,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session semi agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......
......@@ -364,6 +364,15 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pTask->inputQueue->queue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pItem);
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册