提交 8bc6420d 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into main

......@@ -175,7 +175,7 @@ cd TDengine
```bash
mkdir debug
cd debug
cmake .. -DBUILD_TOOLS=true
cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
......
......@@ -183,7 +183,7 @@ It equals to execute following commands:
```bash
mkdir debug
cd debug
cmake .. -DBUILD_TOOLS=true
cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
......
......@@ -4,5 +4,5 @@ if [ ! -d debug ]; then
mkdir debug || echo -e "failed to make directory for build"
fi
cd debug && cmake .. -DBUILD_TOOLS=true && make
cd debug && cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true && make
......@@ -3574,6 +3574,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
......@@ -3736,6 +3737,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -3863,6 +3865,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session semi agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......
......@@ -364,6 +364,15 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pTask->inputQueue->queue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pItem);
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册