From f35f5d5f4fea6b137de412a720c7d7aec75d6f9d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 5 Sep 2022 10:52:04 +0800 Subject: [PATCH] feat(stream): refactor single interval --- source/libs/executor/inc/executorimpl.h | 20 + source/libs/executor/src/executorimpl.c | 4 +- source/libs/executor/src/timewindowoperator.c | 488 ++++++++++++------ tests/script/tsim/stream/basic1.sim | 34 ++ .../tsim/stream/distributeInterval0.sim | 41 +- .../script/tsim/stream/partitionbyColumn0.sim | 28 +- .../script/tsim/stream/partitionbyColumn1.sim | 85 +-- .../script/tsim/stream/partitionbyColumn2.sim | 10 + 8 files changed, 492 insertions(+), 218 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 93bb5b0b49..a9826e0018 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -591,6 +591,24 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { SNode* pCondition; } SMergeAlignedIntervalAggOperatorInfo; +typedef struct SStreamIntervalOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode + SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindowAggSupp twAggSup; + bool invertible; + bool ignoreExpiredData; + SArray* pRecycledPages; + SArray* pDelWins; // SWinRes + int32_t delIndex; + SSDataBlock* pDelRes; + bool isFinal; +} SStreamIntervalOperatorInfo; + typedef struct SStreamFinalIntervalOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info @@ -1003,6 +1021,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0240102437..205bcd58df 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3913,7 +3913,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo); } - } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { + } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); @@ -3938,6 +3938,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode, pTaskInfo, isStream); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { + pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b7a0673d88..d773f8a629 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -939,7 +939,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - int32_t scanFlag, SHashObj* pUpdatedMap) { + int32_t scanFlag) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -955,21 +955,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); - int32_t ret = TSDB_CODE_SUCCESS; - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && - inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { - ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); - } + int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); @@ -991,12 +981,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); } - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && - inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, - numOfOutput); - } + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1007,13 +994,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (startPos < 0) { break; } - if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { - ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); - continue; - } - // null data, failed to allocate more memory buffer int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); @@ -1021,11 +1001,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); - setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); - } - ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); @@ -1130,7 +1105,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag); } initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder); @@ -1581,141 +1556,6 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo } } -static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { - SIntervalAggOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - pInfo->inputOrder = TSDB_ORDER_ASC; - SExprSupp* pSup = &pOperator->exprSupp; - - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - if (pOperator->status == OP_RES_TO_RETURN) { - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval"); - return pInfo->pDelRes; - } - - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - qDebug("===stream===single interval is done"); - freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); - } - printDataBlock(pInfo->binfo.pRes, "single interval"); - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; - } - - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); - - SStreamState* pState = pTaskInfo->streamInfo.pState; - - while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - break; - } - // qInfo("===stream===%ld", pBlock->info.version); - printDataBlock(pBlock, "single interval recv"); - - if (pBlock->info.type == STREAM_CLEAR) { - doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, - NULL); - qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); - continue; - } - if (pBlock->info.type == STREAM_DELETE_DATA) { - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); - continue; - } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); - continue; - } - - if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) { - // set input version - pTaskInfo->version = pBlock->info.version; - } - - if (pInfo->scalarSupp.pExprInfo != NULL) { - SExprSupp* pExprSup = &pInfo->scalarSupp; - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); - } - - // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the - // caller. Note that all the time window are not close till now. - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, MAIN_SCAN, true); - if (pInfo->invertible) { - setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); - } - - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); - } - -#if 0 - if (pState) { - printf(">>>>>>>> stream read backend\n"); - SWinKey key = { - .ts = 1, - .groupId = 2, - }; - char* val = NULL; - int32_t sz; - if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) { - ASSERT(0); - } - printf("stream read %s %d\n", val, sz); - streamFreeVal(val); - - SStreamStateCur* pCur = streamStateGetCur(pState, &key); - ASSERT(pCur); - while (streamStateCurNext(pState, pCur) == 0) { - SWinKey key1; - const void* val1; - if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) { - break; - } - printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz); - } - streamStateFreeCur(pCur); - } -#endif - - pOperator->status = OP_RES_TO_RETURN; - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, - pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); - - void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pUpdated, pIte); - } - taosArraySort(pUpdated, resultrowComparAsc); - - finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - removeDeleteResults(pUpdatedMap, pInfo->pDelWins); - taosHashCleanup(pUpdatedMap); - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, "single interval"); - return pInfo->pDelRes; - } - - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - printDataBlock(pInfo->binfo.pRes, "single interval"); - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; -} - static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -1925,7 +1765,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { @@ -5627,3 +5467,311 @@ _error: pTaskInfo->code = code; return NULL; } + +static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, + SSDataBlock* pBlock, int32_t scanFlag, SHashObj* pUpdatedMap) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; + + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + SExprSupp* pSup = &pOperatorInfo->exprSupp; + + int32_t startPos = 0; + int32_t numOfOutput = pSup->numOfExprs; + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; + uint64_t tableGroupId = pBlock->info.groupId; + bool ascScan = true; + TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); + SResultRow* pResult = NULL; + + STimeWindow win = + getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); + int32_t ret = TSDB_CODE_SUCCESS; + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && + inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); + } + } + + TSKEY ekey = ascScan ? win.ekey : win.skey; + int32_t forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + ASSERT(forwardRows > 0); + + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && + inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); + } + + STimeWindow nextWin = win; + while (1) { + int32_t prevEndPos = forwardRows - 1 + startPos; + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + if (startPos < 0) { + break; + } + if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + continue; + } + + // null data, failed to allocate more memory buffer + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); + } + + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); + } +} + +static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int64_t maxTs = INT64_MIN; + SExprSupp* pSup = &pOperator->exprSupp; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "single interval"); + return pInfo->pDelRes; + } + + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + qDebug("===stream===single interval is done"); + freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + } + printDataBlock(pInfo->binfo.pRes, "single interval"); + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + + SStreamState* pState = pTaskInfo->streamInfo.pState; + + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + printDataBlock(pBlock, "single interval recv"); + + if (pBlock->info.type == STREAM_CLEAR) { + doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, + NULL); + qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); + continue; + } else if (pBlock->info.type == STREAM_DELETE_DATA) { + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); + continue; + } else if (pBlock->info.type == STREAM_GET_ALL) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); + continue; + } + + if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) { + // set input version + pTaskInfo->version = pBlock->info.version; + } + + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + + // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the + // caller. Note that all the time window are not close till now. + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + if (pInfo->invertible) { + setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); + } + + maxTs = TMAX(maxTs, pBlock->info.window.ekey); + doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); + } + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + +#if 0 + if (pState) { + printf(">>>>>>>> stream read backend\n"); + SWinKey key = { + .ts = 1, + .groupId = 2, + }; + char* val = NULL; + int32_t sz; + if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) { + ASSERT(0); + } + printf("stream read %s %d\n", val, sz); + streamFreeVal(val); + + SStreamStateCur* pCur = streamStateGetCur(pState, &key); + ASSERT(pCur); + while (streamStateCurNext(pState, pCur) == 0) { + SWinKey key1; + const void* val1; + if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) { + break; + } + printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz); + } + streamStateFreeCur(pCur); + } +#endif + + pOperator->status = OP_RES_TO_RETURN; + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, + pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosArraySort(pUpdated, resultrowComparAsc); + + finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); + taosHashCleanup(pUpdatedMap); + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "single interval"); + return pInfo->pDelRes; + } + + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + printDataBlock(pInfo->binfo.pRes, "single interval"); + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} + +void destroyStreamIntervalOperatorInfo(void* param) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); + cleanupAggSup(&pInfo->aggSup); + pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages); + + pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins); + pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); + + cleanupGroupResInfo(&pInfo->groupResInfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + taosMemoryFreeClear(param); +} + +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo) { + SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + ASSERT(numOfCols > 0); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; + STimeWindowAggSupp twAggSupp = {.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, }; + ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); + pOperator->pTaskInfo = pTaskInfo; + pInfo->interval = interval; + pInfo->twAggSup = twAggSupp; + pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; + pInfo->isFinal = false; + + if (pIntervalPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;; + initResultSizeInfo(&pOperator->resultInfo, 4096); + SExprSupp* pSup = &pOperator->exprSupp; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + initBasicInfo(&pInfo->binfo, pResBlock); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + + pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); + pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API + pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + pInfo->delIndex = 0; + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + initResultRowInfo(&pInfo->binfo.resultRowInfo); + + pOperator->name = "StreamIntervalOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, + destroyStreamIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + + initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyStreamIntervalOperatorInfo(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index d9777d5133..70ca1179b0 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -588,4 +588,38 @@ if $data00 != 5 then goto loop3 endi +#max,min selectivity +sql create database test3 vgroups 1; +sql use test3; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from ts1 interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sleep 50 +sql insert into ts1 values(1648791222001,2,2,3); +sleep 50 + +$loop_count = 0 +loop3: +sql select * from streamtST3; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data02 != 1 then + print =====data02=$data02 + goto loop3 +endi + +# row 1 +if $data12 != 2 then + print =====data12=$data12 + goto loop3 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index b6b427343e..9b2e940556 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -198,7 +198,7 @@ endi sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); -sql create database test1 vgroups 1; +sql create database test1 vgroups 4; sql use test1; sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); sql create table ts1 using st tags(1,1,1); @@ -232,4 +232,43 @@ if $data11 != 2 then goto loop2 endi +#max,min selectivity +sql create database test3 vgroups 4; +sql use test3; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sleep 50 +sql insert into ts1 values(1648791222001,2,2,3); +sleep 50 +sql insert into ts2 values(1648791211000,1,2,3); +sleep 50 +sql insert into ts2 values(1648791222001,2,2,3); +sleep 50 + +$loop_count = 0 +loop3: +sql select * from streamtST3; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data02 != 1 then + print =====data02=$data02 + goto loop3 +endi + +# row 1 +if $data12 != 2 then + print =====data12=$data12 + goto loop3 +endi + system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/partitionbyColumn0.sim b/tests/script/tsim/stream/partitionbyColumn0.sim index d91d4b7bf0..24fdb9c994 100644 --- a/tests/script/tsim/stream/partitionbyColumn0.sim +++ b/tests/script/tsim/stream/partitionbyColumn0.sim @@ -24,7 +24,7 @@ sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); $loop_count = 0 loop0: -sleep 100 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -48,7 +48,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0); $loop_count = 0 loop1: -sleep 100 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -71,7 +71,7 @@ sql insert into t1 values(1648791213000,2,2,3,1.0); $loop_count = 0 loop2: -sleep 100 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -97,7 +97,7 @@ sql insert into t1 values(1648791213002,1,2,3,1.0); $loop_count = 0 loop3: -sleep 100 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -134,7 +134,7 @@ sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (16 $loop_count = 0 loop4: -sleep 100 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -208,7 +208,7 @@ sql insert into t1 values(1648791213001,1,2,3,2.0); $loop_count = 0 loop5: -sleep 100 +sleep 50 sql select * from streamt1 order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -229,7 +229,7 @@ sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0); $loop_count = 0 loop6: -sleep 100 +sleep 50 sql select * from streamt1 order by c1, c4, c2, c3; $loop_count = $loop_count + 1 @@ -294,7 +294,7 @@ sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); $loop_count = 0 loop7: -sleep 100 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -318,7 +318,7 @@ sql insert into t2 values(1648791213000,1,2,3,1.0); $loop_count = 0 loop8: -sleep 100 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -342,7 +342,7 @@ sql insert into t2 values(1648791213000,2,2,3,1.0); $loop_count = 0 loop9: -sleep 100 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -372,7 +372,7 @@ sql insert into t2 values(1648791213002,1,2,3,1.0); $loop_count = 0 loop10: -sleep 100 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -414,7 +414,7 @@ sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (16 $loop_count = 0 loop11: -sleep 100 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -492,7 +492,7 @@ sql insert into t4 values(1648791213000,1,2,3,1.0); $loop_count = 0 loop13: -sleep 100 +sleep 50 sql select * from test.streamt4 order by c1, c2, c3; $loop_count = $loop_count + 1 @@ -534,7 +534,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0); $loop_count = 0 loop14: -sleep 100 +sleep 50 sql select * from test.streamt4 order by c1, c2, c3; $loop_count = $loop_count + 1 diff --git a/tests/script/tsim/stream/partitionbyColumn1.sim b/tests/script/tsim/stream/partitionbyColumn1.sim index 7f5c53ebe3..1742d52cf0 100644 --- a/tests/script/tsim/stream/partitionbyColumn1.sim +++ b/tests/script/tsim/stream/partitionbyColumn1.sim @@ -24,11 +24,11 @@ sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); $loop_count = 0 loop0: -sleep 300 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -45,12 +45,14 @@ endi sql insert into t1 values(1648791213000,1,2,3,1.0); +$loop_count = 0 + loop1: -sleep 300 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -66,12 +68,14 @@ endi sql insert into t1 values(1648791213000,2,2,3,1.0); +$loop_count = 0 + loop2: -sleep 300 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -90,12 +94,14 @@ sql insert into t1 values(1648791213001,2,2,3,1.0); sql insert into t1 values(1648791213002,2,2,3,1.0); sql insert into t1 values(1648791213002,1,2,3,1.0); +$loop_count = 0 + loop3: -sleep 300 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -125,12 +131,14 @@ sql insert into t1 values(1648791223002,3,2,3,1.0); sql insert into t1 values(1648791223003,3,2,3,1.0); sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +$loop_count = 0 + loop4: -sleep 300 +sleep 50 sql select * from streamt order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -199,11 +207,11 @@ sql insert into t1 values(1648791213001,1,2,3,2.0); $loop_count = 0 loop5: -sleep 300 +sleep 50 sql select * from streamt1 order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -217,12 +225,14 @@ sql insert into t1 values(1648791223001,1,2,5,2.0); sql insert into t1 values(1648791223002,1,2,5,2.0); sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0); +$loop_count = 0 + loop6: -sleep 300 +sleep 50 sql select * from streamt1 order by c1, c4, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -282,11 +292,11 @@ sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); $loop_count = 0 loop7: -sleep 300 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -303,12 +313,14 @@ endi sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t2 values(1648791213000,1,2,3,1.0); +$loop_count = 0 + loop8: -sleep 300 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -324,12 +336,15 @@ endi sql insert into t1 values(1648791213000,2,2,3,1.0); sql insert into t2 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 + loop9: -sleep 300 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -352,12 +367,14 @@ sql insert into t2 values(1648791213001,2,2,3,1.0); sql insert into t2 values(1648791213002,2,2,3,1.0); sql insert into t2 values(1648791213002,1,2,3,1.0); +$loop_count = 0 + loop10: -sleep 300 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -373,7 +390,7 @@ endi if $data11 != 2 thenloop4 print =====data11=$data11 - goto loop3 + goto loop10 endi if $data12 != 1 then @@ -392,12 +409,14 @@ sql insert into t2 values(1648791223002,3,2,3,1.0); sql insert into t2 values(1648791223003,3,2,3,1.0); sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +$loop_count = 0 + loop11: -sleep 300 +sleep 50 sql select * from test.streamt2 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi @@ -470,17 +489,17 @@ sql insert into t4 values(1648791213000,1,2,3,1.0); $loop_count = 0 loop13: -sleep 300 +sleep 50 sql select * from test.streamt4 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi if $rows != 2 then print =====rows=$rows - goto loop14 + goto loop13 endi if $data01 != 1 then @@ -495,12 +514,12 @@ endi if $data11 != 3 then print =====data11=$data11 - goto loop11 + goto loop13 endi if $data12 != 2 then print =====data12=$data12 - goto loop11 + goto loop13 endi sql insert into t4 values(1648791213000,2,2,3,1.0); @@ -509,12 +528,14 @@ sql insert into t1 values(1648791233000,2,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.0); +$loop_count = 0 + loop14: -sleep 300 +sleep 50 sql select * from test.streamt4 order by c1, c2, c3; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 20 then return -1 endi diff --git a/tests/script/tsim/stream/partitionbyColumn2.sim b/tests/script/tsim/stream/partitionbyColumn2.sim index 3d9acbcac5..75d01b17ec 100644 --- a/tests/script/tsim/stream/partitionbyColumn2.sim +++ b/tests/script/tsim/stream/partitionbyColumn2.sim @@ -40,6 +40,8 @@ endi sql insert into t1 values(1648791213000,1,1,3,1.0); +$loop_count = 0 + loop1: sleep 300 sql select * from streamt order by c1, c4, c2, c3; @@ -61,6 +63,8 @@ endi sql insert into t1 values(1648791213000,2,1,3,1.0); +$loop_count = 0 + loop2: sleep 300 sql select * from streamt order by c1, c4, c2, c3; @@ -85,6 +89,8 @@ sql insert into t1 values(1648791213001,2,1,3,1.0); sql insert into t1 values(1648791213002,2,1,3,1.0); sql insert into t1 values(1648791213002,1,1,3,1.0); +$loop_count = 0 + loop3: sleep 300 sql select * from streamt order by c1, c4, c2, c3; @@ -120,6 +126,8 @@ sql insert into t1 values(1648791223002,3,2,3,1.0); sql insert into t1 values(1648791223003,3,2,3,1.0); sql insert into t1 values(1648791213001,1,1,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +$loop_count = 0 + loop4: sleep 300 sql select * from streamt order by c1, c4, c2, c3; @@ -212,6 +220,8 @@ sql insert into t1 values(1648791223001,1,2,2,5); sql insert into t1 values(1648791223002,1,2,2,6); sql insert into t1 values(1648791213001,1,1,1,7) (1648791223002,1,1,2,8); +$loop_count = 0 + loop6: sleep 300 sql select * from streamt1 order by c1, c4, c2, c3; -- GitLab