diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 47cfa5d4106c5368105abe18d2970a05203e4d8d..cd21de52edffad82caefa4523c0a4cad415369c8 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1752,7 +1752,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag, + len += snprintf(dumpBuf + len, size - len, "%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld|======\n", "dumpBlockData", (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, pDataBlock->info.uid); if (len >= size - 1) return dumpBuf; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f110b1c92e4ca29b9926641480598c2eb181288f..cd50a13eddafae18fd4a407e69ef4b668b39fc28 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -371,6 +371,13 @@ typedef struct SessionWindowSupporter { uint8_t parentType; } SessionWindowSupporter; +typedef struct STimeWindowSupp { + int8_t calTrigger; + int64_t waterMark; + TSKEY maxTs; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. +} STimeWindowAggSupp; + typedef struct SStreamScanInfo { uint64_t tableUid; // queried super table uid SExprInfo* pPseudoExpr; @@ -407,6 +414,7 @@ typedef struct SStreamScanInfo { SSDataBlock* pDeleteDataRes; // delete data SSDataBlock int32_t deleteDataIndex; STimeWindow updateWin; + STimeWindowAggSupp twAggSup; // status for tmq // SSchemaWrapper schema; @@ -452,13 +460,6 @@ typedef struct SAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; -typedef struct STimeWindowSupp { - int8_t calTrigger; - int64_t waterMark; - TSKEY maxTs; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. -} STimeWindowAggSupp; - typedef struct SIntervalAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info @@ -952,6 +953,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); +bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index afb5e5ed45b3a0a1990eb9afd50af53bd6f17899..a81a796fa736de1f87ecf51ae71f54dcae3b9c8d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -802,7 +802,12 @@ static bool isStateWindow(SStreamScanInfo* pInfo) { static bool isIntervalWindow(SStreamScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || - pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; + pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || + pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; +} + +static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; } static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { @@ -1130,9 +1135,14 @@ static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlo static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); - TSKEY* ts = (TSKEY*)pColDataInfo->pData; + TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { - if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); + // must check update info first. + bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); + if ( (update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup)) ) && out) { taosArrayPush(pInfo->tsArray, &rowId); } } @@ -1413,6 +1423,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->tsArrayIndex = 0; checkUpdateData(pInfo, true, pInfo->pRes, true); setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) { pInfo->updateResIndex = 0; @@ -1584,13 +1595,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pUpdateRes = createResDataBlock(pDescNode); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; + pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; pInfo->pPullDataRes = createPullDataBlock(); pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; + pInfo->twAggSup = *pTwSup; pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index acc56af5cd84e721cc76bc7da8e8da624e4f34c9..71eb221c645768517b50d4023dfb7cf9df2c127c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -652,8 +652,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num } void printDataBlock(SSDataBlock* pBlock, const char* flag) { - if (pBlock == NULL) { - qDebug("======printDataBlock Block is Null"); + if (!pBlock || pBlock->info.rows == 0) { + qDebug("======printDataBlock: Block is Null or Empty"); return; } char* pBuf = NULL; @@ -1355,7 +1355,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, int32_t size = taosArrayGetSize(chAy); qDebug("window %" PRId64 " wait child size:%d", win.skey, size); for (int32_t i = 0; i < size; i++) { - qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); + qDebug("window %" PRId64 " wait child id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); } continue; } else if (pPullDataMap) { @@ -1626,6 +1626,12 @@ SSDataBlock* createDeleteBlock() { return pBlock; } +void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) { + ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->sessionSup.parentType = type; +} + SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, @@ -1701,6 +1707,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { + initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL); + } + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2476,12 +2486,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } else { int32_t index = -1; SArray* chArray = NULL; + int32_t chId = 0; if (chIds) { chArray = *(void**)chIds; - int32_t chId = getChildIndex(pSDataBlock); + chId = getChildIndex(pSDataBlock); index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); } if (index != -1 && pSDataBlock->info.type == STREAM_PULL_DATA) { + qDebug("======delete child id %d", chId); taosArrayRemove(chArray, index); if (taosArrayGetSize(chArray) == 0) { // pull data is over @@ -3010,6 +3022,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num pDummy[i].functionId = pCtx[i].functionId; } } + void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark, uint8_t type) { ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 871bcf015b37eba431c3ce79645a24aa65785ca7..ae0ccb1c5139434e9f473080b99a32bd965d94c5 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -198,8 +198,7 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) { } static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) { - return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) || - (streamQuery && TSDB_SUPER_TABLE == pScan->tableType); + return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1); } static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 1ffca2a67bbad0180428178e26395dfca4e6bcde..f34a50de9dfa3ec89f5e55493e03f6d402434be6 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -366,4 +366,143 @@ if $data32 != 8 then goto loop1 endi +sql drop database IF EXISTS test2; +sql drop stream IF EXISTS streams21; +sql drop stream IF EXISTS streams22; + +sql create database test2 vgroups 2; +sql use test2; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); +sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791223001,2,2,2,1.1); +sql insert into t1 values(1648791233002,3,3,3,2.1); +sql insert into t1 values(1648791243003,4,4,4,3.1); +sql insert into t1 values(1648791213004,4,5,5,4.1); + +sql insert into t2 values(1648791213000,1,6,6,1.0); +sql insert into t2 values(1648791223001,2,7,7,1.1); +sql insert into t2 values(1648791233002,3,8,8,2.1); +sql insert into t2 values(1648791243003,4,9,9,3.1); +sql insert into t2 values(1648791213004,4,10,10,4.1); + +$loop_count = 0 + +loop2: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt; + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop2 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop2 +endi + +# row 2 +if $data21 != 1 then + print =====data21=$data21 + goto loop2 +endi + +if $data22 != 3 then + print =====data22=$data22 + goto loop2 +endi + +# row 3 +if $data31 != 1 then + print =====data31=$data31 + goto loop2 +endi + +if $data32 != 4 then + print =====data32=$data32 + goto loop2 +endi + +print step 6 + +$loop_count = 0 + +loop3: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; + +# row 0 +if $data01 != 4 then + print =====data01=$data01 + # goto loop3 +endi + +if $data02 != 10 then + print =====data02=$data02 + goto loop3 +endi + +# row 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop3 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop3 +endi + +# row 2 +if $data21 != 2 then + print =====data21=$data21 + goto loop3 +endi + +if $data22 != 6 then + print =====data22=$data22 + goto loop3 +endi + +# row 3 +if $data31 != 2 then + print =====data31=$data31 + goto loop3 +endi + +if $data32 != 8 then + print =====data32=$data32 + goto loop3 +endi + system sh/stop_dnodes.sh \ No newline at end of file