From aa20cfedcde7c8941c43f22f93f025cf98492894 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 27 May 2022 16:03:43 +0800 Subject: [PATCH] feat(stream): watermark and trigger --- include/libs/nodes/plannodes.h | 6 + source/libs/executor/inc/executorimpl.h | 15 +- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 23 +- source/libs/executor/src/scanoperator.c | 100 +----- source/libs/executor/src/timewindowoperator.c | 292 ++++++++++++------ source/libs/nodes/src/nodesCodeFuncs.c | 21 ++ source/libs/planner/src/planOptimizer.c | 3 + source/libs/planner/src/planPhysiCreater.c | 3 + source/libs/stream/src/tstreamUpdate.c | 23 +- tests/script/tsim/stream/triggerInterval0.sim | 185 +++++++++++ tests/script/tsim/stream/triggerSession0.sim | 105 +++++++ tools/taos-tools | 2 +- 13 files changed, 560 insertions(+), 220 deletions(-) create mode 100644 tests/script/tsim/stream/triggerInterval0.sim create mode 100644 tests/script/tsim/stream/triggerSession0.sim diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3ae2d18e5d..2648a468dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -56,6 +56,9 @@ typedef struct SScanLogicNode { int8_t intervalUnit; int8_t slidingUnit; SNode* pTagCond; + int8_t triggerType; + int64_t watermark; + int16_t tsColId; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -216,6 +219,9 @@ typedef struct STableScanPhysiNode { int64_t sliding; int8_t intervalUnit; int8_t slidingUnit; + int8_t triggerType; + int64_t watermark; + int16_t tsColId; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9e15f3b12d..9ea327636d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -397,10 +397,9 @@ typedef struct SStreamBlockScanInfo { SReadHandle readHandle; uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; - SOperatorInfo* pOperatorDumy; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. - SCatchSupporter childAggSup; - SArray* childIds; + SOperatorInfo* pOperatorDumy; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SArray* childIds; SessionWindowSupporter sessionSup; bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. } SStreamBlockScanInfo; @@ -441,6 +440,7 @@ typedef struct SAggSupporter { typedef struct STimeWindowSupp { int8_t calTrigger; int64_t waterMark; + TSKEY maxTs; SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; @@ -572,6 +572,7 @@ typedef struct SResultWindowInfo { SResultRowPosition pos; STimeWindow win; bool isOutput; + bool isClosed; } SResultWindowInfo; typedef struct SStreamSessionAggOperatorInfo { @@ -742,7 +743,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, + SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, + STimeWindowAggSupp* pTwSup, int16_t tsColId); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, @@ -799,8 +802,6 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, - const char* pDir); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5a02547f58..aea9d70f31 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -233,7 +233,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { if (pGroupResInfo->pRows != NULL) { - taosArrayDestroy(pGroupResInfo->pRows); + taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree); } pGroupResInfo->pRows = pArrayList; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3e52ff1836..ac7c325478 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4492,7 +4492,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - + STimeWindowAggSupp twSup = {.waterMark = pTableScanNode->watermark, + .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); @@ -4508,7 +4509,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SArray* tableIdList = extractTableIdList(pTableListInfo); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, + tableIdList, pTableScanNode, pTaskInfo, &twSup, pTableScanNode->tsColId); taosArrayDestroy(tableIdList); return pOperator; @@ -4608,7 +4610,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, - .calTrigger = pIntervalPhyNode->window.triggerType}; + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo); @@ -5169,20 +5172,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) { - pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); - pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK); - if (pCatchSup->pKeyBuf == NULL || pCatchSup->pWindowHashTable == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t pageSize = rowSize * 32; - int32_t bufSize = pageSize * 4096; - return createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir); -} - int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fe7668f370..b954eb3a22 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -755,91 +755,6 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti return NULL; } -void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) { - int64_t* pKey = (int64_t*)pSup->pKeyBuf; - pKey[0] = groupId; - pKey[1] = childId; - pKey[2] = ts; -} - -static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex, - int64_t childId) { - SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex); - TSKEY* tsCols = (int64_t*)pColDataInfo->pData; - for (int32_t i = 0; i < pDataBlock->info.rows; i++) { - setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]); - SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize); - if (p1 == NULL) { - SWindowPosition pos = {.pageId = pageId, .rowId = i}; - int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition)); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - p1->pageId = pageId; - p1->rowId = i; - } - } - return TSDB_CODE_SUCCESS; -} - -static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) { - int32_t start = 0; - int32_t stop = 0; - int32_t pageSize = getBufPageSize(pSup->pDataBuf); - while (start < pDataBlock->info.rows) { - blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize); - SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - if (pDB == NULL) { - return terrno; - } - int32_t pageId = -1; - void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId); - if (pPage == NULL) { - blockDataDestroy(pDB); - return terrno; - } - int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t); - assert(size <= pageSize); - blockDataToBuf(pPage, pDB); - setBufPageDirty(pPage, true); - releaseBufPage(pSup->pDataBuf, pPage); - blockDataDestroy(pDB); - start = stop + 1; - int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - return TSDB_CODE_SUCCESS; -} - -static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) { - SSDataBlock* pBlock = pInfo->pUpdateRes; - if (pInfo->updateResIndex < pBlock->info.rows) { - blockDataCleanup(pInfo->pRes); - SCatchSupporter* pCSup = &pInfo->childAggSup; - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0); - TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; - int32_t size = taosArrayGetSize(pInfo->childIds); - for (int32_t i = 0; i < size; i++) { - int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i); - setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]); - SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize); - void* buf = getBufPage(pCSup->pDataBuf, pos->pageId); - SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false); - blockDataFromBuf(pDB, buf); - SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1); - blockDataMerge(pInfo->pRes, pSub); - blockDataDestroy(pDB); - blockDataDestroy(pSub); - } - pInfo->updateResIndex++; - return pInfo->pRes; - } - return NULL; -} - static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -977,7 +892,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } -SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, + SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, + STimeWindowAggSupp* pTwSup, int16_t tsColId) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1022,9 +939,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan goto _error; } - pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan + pInfo->primaryTsIndex = tsColId; if (pSTInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan + pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); } else { pInfo->pUpdateInfo = NULL; } @@ -1045,9 +962,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pInfo->interval = pSTInfo->interval; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; - // TODO(liuyao) get row size from phy plan - initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", TD_TMP_DIR_PATH); - pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blocking = false; @@ -1056,8 +970,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pOperator->numOfExprs = pInfo->pRes->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, + NULL, operatorDummyCloseFn, NULL, NULL, NULL); return pOperator; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5d0a0a0270..829968d37f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -622,18 +622,103 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd } } -static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - uint64_t tableGroupId) { +typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); + +int32_t binarySearch(void* keyList, int num, TSKEY key, int order, + __get_value_fn_t getValuefn) { + int firstPos = 0, lastPos = num - 1, midPos = -1; + int numOfRows = 0; + + if (num <= 0) return -1; + if (order == TSDB_ORDER_DESC) { + // find the first position which is smaller or equal than the key + while (1) { + if (key >= getValuefn(keyList, lastPos)) return lastPos; + if (key == getValuefn(keyList, firstPos)) return firstPos; + if (key < getValuefn(keyList, firstPos)) return firstPos - 1; + + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; + + if (key < getValuefn(keyList, midPos)) { + lastPos = midPos - 1; + } else if (key > getValuefn(keyList, midPos)) { + firstPos = midPos + 1; + } else { + break; + } + } + + } else { + // find the first position which is bigger or equal than the key + while (1) { + if (key <= getValuefn(keyList, firstPos)) return firstPos; + if (key == getValuefn(keyList, lastPos)) return lastPos; + + if (key > getValuefn(keyList, lastPos)) { + lastPos = lastPos + 1; + if (lastPos >= num) + return -1; + else + return lastPos; + } + + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; + + if (key < getValuefn(keyList, midPos)) { + lastPos = midPos - 1; + } else if (key > getValuefn(keyList, midPos)) { + firstPos = midPos + 1; + } else { + break; + } + } + } + + return midPos; +} + +int64_t getReskey(void* data, int32_t index) { + SArray* res = (SArray*) data; + SResKeyPos* pos = taosArrayGetP(res, index); + return *(int64_t*)pos->key; +} + +static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) { + int32_t size = taosArrayGetSize(pUpdated); + int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey); + if (index == -1) { + index = 0; + } else { + TSKEY resTs = getReskey(pUpdated, index); + if (resTs < result->win.skey) { + index++; + } else { + return TSDB_CODE_SUCCESS; + } + } + + SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (newPos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + newPos->groupId = groupId; + newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset}; + *(int64_t*)newPos->key = result->win.skey; + if (taosArrayInsert(pUpdated, index, &newPos) == NULL ){ + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; +} + +static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, + uint64_t tableGroupId, SArray* pUpdated) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; int32_t numOfOutput = pOperatorInfo->numOfExprs; - SArray* pUpdated = NULL; - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pUpdated = taosArrayInit(4, POINTER_BYTES); - } - int32_t step = 1; bool ascScan = (pInfo->order == TSDB_ORDER_ASC); @@ -663,13 +748,10 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*)pos->key = pResult->win.skey; - - taosArrayPush(pUpdated, &pos); + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && + (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == 0) ) { + saveResult(pResult, tableGroupId, pUpdated); } int32_t forwardStep = 0; @@ -742,13 +824,10 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*)pos->key = pResult->win.skey; - - taosArrayPush(pUpdated, &pos); + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && + (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || + pInfo->twAggSup.calTrigger == 0) ) { + saveResult(pResult, tableGroupId, pUpdated); } ekey = ascScan? nextWin.ekey:nextWin.skey; @@ -769,7 +848,6 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols); } - return pUpdated; // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } @@ -799,7 +877,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL); #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ @@ -1067,7 +1145,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, } static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, - SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock, + SInterval* pInterval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; @@ -1075,8 +1153,8 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, for (int32_t i = 0; i < pBlock->info.rows; i += step) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pIntrerval, - pIntrerval->precision, NULL); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, + pInterval->precision, NULL); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); @@ -1086,6 +1164,39 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, } } +static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup, + SInterval* pInterval, SArray* closeWins) { + void *pIte = NULL; + size_t keyLen = 0; + while((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + uint64_t groupId = *(uint64_t*) key; + ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); + TSKEY ts = *(uint64_t*) ((char*)key + sizeof(uint64_t)); + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, + pInterval->precision, NULL); + if (win.ekey < pSup->maxTs - pSup->waterMark) { + char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; + SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); + taosHashRemove(pHashMap, keyBuf, keyLen); + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (pos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pos->groupId = groupId; + pos->pos = *(SResultRowPosition*) pIte; + *(int64_t*)pos->key = ts; + if (!taosArrayPush(closeWins, &pos)) { + taosMemoryFree(pos); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + } + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1106,7 +1217,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = NULL; + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + SArray* pClosed = taosArrayInit(4, POINTER_BYTES); + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1128,10 +1241,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } - pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, pUpdated); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } - - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, + &pInfo->interval, pClosed); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + taosArrayAddAll(pUpdated, pClosed); + } + taosArrayDestroy(pClosed); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, + pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -1935,63 +2057,6 @@ _error: return NULL; } -typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); - -int32_t binarySearch(void* keyList, int num, TSKEY key, int order, - __get_value_fn_t getValuefn) { - int firstPos = 0, lastPos = num - 1, midPos = -1; - int numOfRows = 0; - - if (num <= 0) return -1; - if (order == TSDB_ORDER_DESC) { - // find the first position which is smaller than the key - while (1) { - if (key >= getValuefn(keyList, lastPos)) return lastPos; - if (key == getValuefn(keyList, firstPos)) return firstPos; - if (key < getValuefn(keyList, firstPos)) return firstPos - 1; - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1) + firstPos; - - if (key < getValuefn(keyList, midPos)) { - lastPos = midPos - 1; - } else if (key > getValuefn(keyList, midPos)) { - firstPos = midPos + 1; - } else { - break; - } - } - - } else { - // find the first position which is bigger than the key - while (1) { - if (key <= getValuefn(keyList, firstPos)) return firstPos; - if (key == getValuefn(keyList, lastPos)) return lastPos; - - if (key > getValuefn(keyList, lastPos)) { - lastPos = lastPos + 1; - if (lastPos >= num) - return -1; - else - return lastPos; - } - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1) + firstPos; - - if (key < getValuefn(keyList, midPos)) { - lastPos = midPos - 1; - } else if (key > getValuefn(keyList, midPos)) { - firstPos = midPos + 1; - } else { - break; - } - } - } - - return midPos; -} - int64_t getSessionWindowEndkey(void* data, int32_t index) { SArray* pWinInfos = (SArray*) data; SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index); @@ -2223,12 +2288,14 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, if (winNum > 0) { compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted); } - - code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + pCurWin->isClosed = false; + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + pCurWin->isOutput = true; } - pCurWin->isOutput = true; i += winRows; } } @@ -2325,6 +2392,37 @@ bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } +int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pClosed, + int8_t calTrigger) { + // Todo(liuyao) save window to tdb + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + SResultWindowInfo *pSeWin = taosArrayGet(pWins, i); + if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { + if (!pSeWin->isClosed) { + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (pos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pos->groupId = 0; + pos->pos = pSeWin->pos; + *(int64_t*)pos->key = pSeWin->win.ekey; + if (!taosArrayPush(pClosed, &pos)) { + taosMemoryFree(pos); + return TSDB_CODE_OUT_OF_MEMORY; + } + pSeWin->isClosed = true; + if (calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + pSeWin->isOutput = true; + } + } + continue; + } + break; + } + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2377,13 +2475,21 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL); } doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } - // restore the value pOperator->status = OP_RES_TO_RETURN; + + SArray* pClosed = taosArrayInit(16, POINTER_BYTES); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, + pInfo->twAggSup.calTrigger); SArray* pUpdated = taosArrayInit(16, POINTER_BYTES); copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); taosHashCleanup(pStUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) { + taosArrayAddAll(pUpdated, pClosed); + } + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8887b9841a..9c742cfb46 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1130,6 +1130,9 @@ static const char* jkTableScanPhysiPlanOffset = "Offset"; static const char* jkTableScanPhysiPlanSliding = "Sliding"; static const char* jkTableScanPhysiPlanIntervalUnit = "intervalUnit"; static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit"; +static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; +static const char* jkTableScanPhysiPlanWatermark = "watermark"; +static const char* jkTableScanPhysiPlanTsColId = "tsColId"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1171,6 +1174,15 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId); + } return code; } @@ -1221,6 +1233,15 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code); ; } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code); + } return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4d489f68e7..adc07fcd0d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -223,6 +223,9 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding; pScan->intervalUnit = ((SWindowLogicNode*)pScan->node.pParent)->intervalUnit; pScan->slidingUnit = ((SWindowLogicNode*)pScan->node.pParent)->slidingUnit; + pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType; + pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark; + pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId; } } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0f88a54e91..a45eabefb9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -503,6 +503,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->sliding = pScanLogicNode->sliding; pTableScan->intervalUnit = pScanLogicNode->intervalUnit; pTableScan->slidingUnit = pScanLogicNode->slidingUnit; + pTableScan->triggerType = pScanLogicNode->triggerType; + pTableScan->watermark = pScanLogicNode->watermark; + pTableScan->tsColId = pScanLogicNode->tsColId; return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); } diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c index 75319a2354..7587fcecc9 100644 --- a/source/libs/stream/src/tstreamUpdate.c +++ b/source/libs/stream/src/tstreamUpdate.c @@ -72,12 +72,14 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { return val; } -static int64_t adjustWatermark(int64_t interval, int64_t watermark) { - if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) { - watermark = MAX_NUM_SCALABLE_BF * interval; - } else if (watermark < MIN_NUM_SCALABLE_BF * interval) { - watermark = MIN_NUM_SCALABLE_BF * interval; - } +static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { + if (watermark <= 0) { + watermark = TMIN(originInt/adjInterval, 1) * adjInterval; + } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { + watermark = MAX_NUM_SCALABLE_BF * adjInterval; + }/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) { + watermark = MIN_NUM_SCALABLE_BF * adjInterval; + }*/ // Todo(liuyao) save window info to tdb return watermark; } @@ -94,7 +96,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pTsSBFs = NULL; pInfo->minTS = -1; pInfo->interval = adjustInterval(interval, precision); - pInfo->watermark = adjustWatermark(pInfo->interval, watermark); + pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark); uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); @@ -149,13 +151,18 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { int32_t res = TSDB_CODE_FAILED; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; + TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); + if (ts < maxTs - pInfo->watermark) { + // this window has been closed. + return true; + } + SScalableBf *pSBf = getSBf(pInfo, ts); // pSBf may be a null pointer if (pSBf) { res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); } - TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (maxTs < ts) { taosArraySet(pInfo->pTsBuckets, index, &ts); return false; diff --git a/tests/script/tsim/stream/triggerInterval0.sim b/tests/script/tsim/stream/triggerInterval0.sim new file mode 100644 index 0000000000..6f1d8f4b7b --- /dev/null +++ b/tests/script/tsim/stream/triggerInterval0.sim @@ -0,0 +1,185 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger window_close into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); + +sql insert into t1 values(1648791213001,1,2,3,1.0); +sleep 300 +sql select * from streamt; +if $rows != 0 then + print ======$rows + return -1 +endi + +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791223002,2,2,3,1.1); +sql insert into t1 values(1648791223003,2,2,3,1.1); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sleep 300 +sql select * from streamt; +if $rows != 1 then + print ======$rows + return -1 +endi + +if $data01 != 1 then + print ======$data01 + return -1 +endi + +sql insert into t1 values(1648791233001,2,2,3,1.1); +sleep 300 +sql select * from streamt; +if $rows != 2 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi +if $data11 != 3 then + print ======$data11 + return -1 +endi + +sql insert into t1 values(1648791223004,2,2,3,1.1); +sql insert into t1 values(1648791223004,2,2,3,1.1); +sql insert into t1 values(1648791223005,2,2,3,1.1); +sleep 300 +sql select * from streamt; +if $rows != 2 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi +if $data11 != 5 then + print ======$data11 + return -1 +endi + + +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791213002,4,2,3,3.1) +sql insert into t1 values(1648791213002,4,2,3,4.1); +sleep 300 +sql select * from streamt; +if $rows != 2 then + print ======$rows + return -1 +endi +if $data01 != 2 then + print ======$data01 + return -1 +endi +if $data11 != 5 then + print ======$data11 + return -1 +endi + +sql create table t2(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger window_close watermark 20s into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 interval(10s); +sql insert into t2 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791239999,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 0 then + print ======$rows + return -1 +endi + +sql insert into t2 values(1648791240000,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791240000,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791280000,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 4 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi +if $data11 != 1 then + print ======$data11 + return -1 +endi +if $data21 != 1 then + print ======$data21 + return -1 +endi +if $data31 != 3 then + print ======$data31 + return -1 +endi + +sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791280000,1,2,3,1.0) (1648791280001,1,2,3,1.0) (1648791280002,1,2,3,1.0) (1648791310000,1,2,3,1.0) (1648791280001,1,2,3,1.0); +sleep 300 +sql select * from streamt2; + +if $rows != 5 then + print ======$rows + return -1 +endi +if $data01 != 1 then + print ======$data01 + return -1 +endi +if $data11 != 1 then + print ======$data11 + return -1 +endi +if $data21 != 1 then + print ======$data21 + return -1 +endi +if $data31 != 3 then + print ======$data31 + return -1 +endi +if $data41 != 3 then + print ======$data31 + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/tsim/stream/triggerSession0.sim b/tests/script/tsim/stream/triggerSession0.sim new file mode 100644 index 0000000000..fb0666fdcf --- /dev/null +++ b/tests/script/tsim/stream/triggerSession0.sim @@ -0,0 +1,105 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test +sql create table t2(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger window_close into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s); + +sql insert into t2 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791222999,1,2,3,1.0); +sql insert into t2 values(1648791223000,1,2,3,1.0); +sql insert into t2 values(1648791223001,1,2,3,1.0); +sql insert into t2 values(1648791233001,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 0 then + print ======$rows + return -1 +endi + +sql insert into t2 values(1648791243002,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi + +if $data01 != 5 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791223001,1,2,3,1.0) (1648791223002,1,2,3,1.0) (1648791222999,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi + +if $data01 != 6 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791233002,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi + +if $data01 != 6 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791253003,1,2,3,1.0); +sleep 300 +sql select * from streamt2; +if $rows != 1 then + print ======$rows + return -1 +endi + +if $data01 != 8 then + print ======$data01 + return -1 +endi + +sql insert into t2 values(1648791243003,1,2,3,1.0) (1648791243002,1,2,3,1.0) (1648791270004,1,2,3,1.0) (1648791280005,1,2,3,1.0) (1648791290006,1,2,3,1.0); +sleep 500 +sql select * from streamt2; +if $rows != 3 then + print ======$rows + return -1 +endi + +if $data01 != 10 then + print ======$data01 + return -1 +endi +if $data11 != 1 then + print ======$data11 + return -1 +endi +if $data21 != 1 then + print ======$data21 + return -1 +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tools/taos-tools b/tools/taos-tools index 4d83d8c629..2f3dfddd4d 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 4d83d8c62973506f760bcaa3a33f4665ed9046d0 +Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c -- GitLab