diff --git a/README-CN.md b/README-CN.md index 53abc5c006445f33053c6364227efb11e0f7823b..2b1790f4bbd5cb9862c62c450cb6cb484526cbb0 100644 --- a/README-CN.md +++ b/README-CN.md @@ -175,7 +175,7 @@ cd TDengine ```bash mkdir debug cd debug -cmake .. -DBUILD_TOOLS=true +cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true make ``` diff --git a/README.md b/README.md index 73df4fb187fedcfdd93dafec7e95f7ccfe0da465..a8c20ea3f606e2b68b76c5c203bb02b20b588105 100644 --- a/README.md +++ b/README.md @@ -183,7 +183,7 @@ It equals to execute following commands: ```bash mkdir debug cd debug -cmake .. -DBUILD_TOOLS=true +cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true make ``` diff --git a/build.sh b/build.sh index 78f08afa7a772e2a9f05178244f2443e0ec45925..04ca7a11a0d8907b63848504beeb6ff96db73dd3 100755 --- a/build.sh +++ b/build.sh @@ -4,5 +4,5 @@ if [ ! -d debug ]; then mkdir debug || echo -e "failed to make directory for build" fi -cd debug && cmake .. -DBUILD_TOOLS=true && make +cd debug && cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true && make diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8839cef3149a968d576d76f9e33b6cf4cbd6f58d..03f24919f28396e5ed16bcf40571f9ec1a46c4da 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3574,6 +3574,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + qDebug("===stream=== stream session agg"); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { @@ -3736,6 +3737,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { + qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(winInfo, pInfo->pStUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -3754,7 +3756,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); - } + } } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -3863,6 +3865,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + qDebug("===stream=== stream session semi agg"); if (pOperator->status == OP_EXEC_DONE) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 34370ebce9a8962b6a5c81fd3de112753d779471..d3ab7e820a59a33195f62d685c7c02f82b1260e7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -364,6 +364,15 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 7. pause allowed. streamTaskEnablePause(pStreamTask); + if (taosQueueEmpty(pTask->inputQueue->queue)) { + SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);; + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + pDelBlock->info.rows = 0; + pDelBlock->info.version = 0; + pItem->type = STREAM_INPUT__REF_DATA_BLOCK; + pItem->pBlock = pDelBlock; + tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pItem); + } streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);