From 9c170ecfa8ab15d7941f12038e54870993ec1c4b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sat, 8 Oct 2022 16:14:32 +0800 Subject: [PATCH] fix(stream): filter error occurred when data was modified --- include/common/tcommon.h | 6 +- source/libs/executor/inc/executorimpl.h | 2 - source/libs/executor/src/scanoperator.c | 68 +++--- source/libs/executor/src/timewindowoperator.c | 194 +++++++++--------- source/libs/stream/src/streamState.c | 6 +- tests/script/tsim/stream/basic1.sim | 52 +++++ .../tsim/stream/partitionbyColumnInterval.sim | 5 +- tests/script/tsim/stream/sliding.sim | 74 ++++++- 8 files changed, 260 insertions(+), 147 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2add3332ab..4957668272 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -49,7 +49,7 @@ typedef struct { TSKEY ts; } SWinKey; -static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { +static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) { SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin2 = (SWinKey*)pKey2; @@ -68,6 +68,10 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i return 0; } +static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { + return sWinKeyCmprImpl(pKey1, pKey2); +} + typedef struct { uint64_t groupId; TSKEY ts; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 897015c4d3..e9f4f84f17 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -606,8 +606,6 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; - SSDataBlock* pUpdateRes; - bool returnUpdate; SPhysiNode* pPhyNode; // create new child SHashObj* pPullDataMap; SArray* pPullWins; // SPullWindowInfo diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1f0d96a2e8..a562d77467 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1331,8 +1331,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false); - colDataAppendNULL(pCalStartCol, pBlock->info.rows); - colDataAppendNULL(pCalEndCol, pBlock->info.rows); + colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false); + colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false); pBlock->info.rows++; } @@ -1376,7 +1376,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } } -static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { +static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1430,7 +1430,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } } - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + if (filter) { + doFilter(pInfo->pCondition, pInfo->pRes, NULL); + } blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); return 0; @@ -1466,7 +1468,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { continue; } - setBlockIntoRes(pInfo, &block); + setBlockIntoRes(pInfo, &block, true); if (pBlockInfo->rows > 0) { return pInfo->pRes; @@ -1507,7 +1509,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqNextBlock(pInfo->tqReader, &ret); if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); - if (setBlockIntoRes(pInfo, &ret.data) < 0) { + if (setBlockIntoRes(pInfo, &ret.data, true) < 0) { ASSERT(0); } if (pInfo->pRes->info.rows > 0) { @@ -1771,6 +1773,7 @@ FETCH_NEXT_BLOCK: // printDataBlock(pSDB, "stream scan update"); return pSDB; } + blockDataCleanup(pInfo->pUpdateDataRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } break; default: @@ -1821,7 +1824,7 @@ FETCH_NEXT_BLOCK: continue; } - setBlockIntoRes(pInfo, &block); + setBlockIntoRes(pInfo, &block, false); if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { @@ -1830,11 +1833,30 @@ FETCH_NEXT_BLOCK: continue; } - if (pBlockInfo->rows > 0) { + if (pInfo->pUpdateInfo) { + checkUpdateData(pInfo, true, pInfo->pRes, true); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); + if (pInfo->pUpdateDataRes->info.rows > 0) { + pInfo->updateResIndex = 0; + if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { + pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; + } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { + pInfo->scanMode = STREAM_SCAN_FROM_RES; + return pInfo->pUpdateDataRes; + } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) { + pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA; + } + } + } + + doFilter(pInfo->pCondition, pInfo->pRes, NULL); + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + + if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } } - if (pBlockInfo->rows > 0) { + if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } else { pInfo->tqReader->pMsg = NULL; @@ -1848,32 +1870,16 @@ FETCH_NEXT_BLOCK: pOperator->resultInfo.totalRows += pBlockInfo->rows; // printDataBlock(pInfo->pRes, "stream scan"); - if (pBlockInfo->rows == 0) { - updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); - /*pOperator->status = OP_EXEC_DONE;*/ - } else if (pInfo->pUpdateInfo) { - checkUpdateData(pInfo, true, pInfo->pRes, true); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); - if (pInfo->pUpdateDataRes->info.rows > 0) { - pInfo->updateResIndex = 0; - if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) { - pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) { - pInfo->scanMode = STREAM_SCAN_FROM_RES; - return pInfo->pUpdateDataRes; - } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) { - pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA; - } - } - } - qDebug("scan rows: %d", pBlockInfo->rows); if (pBlockInfo->rows > 0) { return pInfo->pRes; - } else { - goto NEXT_SUBMIT_BLK; } - /*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/ + + if (pInfo->pUpdateDataRes->info.rows > 0) { + goto FETCH_NEXT_BLOCK; + } + + goto NEXT_SUBMIT_BLK; } else { ASSERT(0); return NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8df6f15a1b..d89cfb7183 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -414,14 +414,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx return true; } -bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { - if (pInterval->interval != pInterval->sliding && - (pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) { +bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) { + if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) { return false; } return true; } +bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { + return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey); +} + static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order) { bool ascQuery = (order == TSDB_ORDER_ASC); @@ -912,6 +915,8 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { } static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { + taosArraySort(pDelWins, sWinKeyCmprImpl); + taosArrayRemoveDuplicate(pDelWins, sWinKeyCmprImpl, NULL); int32_t delSize = taosArrayGetSize(pDelWins); if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) { return; @@ -1387,7 +1392,7 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, return true; } -static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) { +static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SWinKey key = {.ts = ts, .groupId = groupId}; tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); @@ -1395,21 +1400,37 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, return true; } -static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock, - SArray* pUpWins, SHashObj* pUpdatedMap) { - SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; - SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; +static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, + SHashObj* pUpdatedMap) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + TSKEY* calStTsCols = (TSKEY*)pCalStTsCol->pData; + SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + TSKEY* calEnTsCols = (TSKEY*)pCalEnTsCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); + STimeWindow win = {0}; + if (IS_FINAL_OP(pInfo)) { + win.skey = startTsCols[i]; + win.ekey = endTsCols[i]; + } else { + win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); + } + do { + if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) { + getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); + continue; + } uint64_t winGpId = pGpDatas[i]; - bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); + bool res = doDeleteWindow(pOperator, win.skey, winGpId); SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; if (pUpWins && res) { taosArrayPush(pUpWins, &winRes); @@ -1511,16 +1532,43 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { return TSDB_CODE_SUCCESS; } +int32_t compareWinKey(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SWinKey* pos = taosArrayGet(res, index); + SWinKey* pData = (SWinKey*)pKey; + if (pData->ts == pos->ts) { + if (pData->groupId > pos->groupId) { + return 1; + } else if (pData->groupId < pos->groupId) { + return -1; + } + return 0; + } else if (pData->ts > pos->ts) { + return 1; + } + return -1; +} + static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, - SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) { + SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins, + SOperatorInfo* pOperator) { qDebug("===stream===close interval window"); void* pIte = NULL; size_t keyLen = 0; int32_t iter = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; + int32_t delSize = taosArrayGetSize(pDelWins); while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); - SWinKey* pWinKey = (SWinKey*)key; + void* key = tSimpleHashGetKey(pIte, &keyLen); + SWinKey* pWinKey = (SWinKey*)key; + if (delSize > 0) { + int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey); + if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) { + taosArrayRemove(pDelWins, index); + delSize = taosArrayGetSize(pDelWins); + } + } + void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey)); STimeWindow win = { .skey = pWinKey->ts, @@ -1624,7 +1672,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, - pOperator); + NULL, pOperator); } } @@ -1694,7 +1742,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { taosHashCleanup(pInfo->pPullDataMap); taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); - blockDataDestroy(pInfo->pUpdateRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); taosMemoryFreeClear(pInfo->pState); @@ -2862,11 +2909,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr isCloseWindow(&parentWin, &pInfo->twAggSup)) { continue; } - int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup); - if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); int32_t num = 0; for (int32_t j = 0; j < numOfChildren; j++) { @@ -2876,6 +2919,13 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SExprSupp* pSup, SAr if (!hasIntervalWindow(pChInfo->pState, pWinRes)) { continue; } + if (num == 0) { + int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, + pSup->rowEntryInfoOffset, &pInfo->aggSup); + if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } num++; SResultRow* pChResult = NULL; setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, @@ -3214,25 +3264,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } else { if (!IS_FINAL_OP(pInfo)) { + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + return pInfo->pDelRes; + } + doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); return pInfo->binfo.pRes; } } - if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { - pInfo->returnUpdate = false; - ASSERT(!IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); - // process the rest of the data - return pInfo->pUpdateRes; - } - doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows != 0) { - // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); - return pInfo->pDelRes; - } } SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); @@ -3241,8 +3285,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - clearSpecialDataBlock(pInfo->pUpdateRes); - removeDeleteResults(pUpdatedMap, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); break; @@ -3252,34 +3294,16 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ASSERT(pBlock->info.type != STREAM_INVERT); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; - } else if (pBlock->info.type == STREAM_CLEAR) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins, NULL); - if (IS_FINAL_OP(pInfo)) { - int32_t childIndex = getChildIndex(pBlock); - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); - SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - - doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL); - rebuildIntervalWindow(pOperator, pSup, pUpWins, pUpdatedMap); - taosArrayDestroy(pUpWins); - continue; - } - removeResults(pUpWins, pUpdatedMap); - copyDataBlock(pInfo->pUpdateRes, pBlock); - pInfo->returnUpdate = true; - taosArrayDestroy(pUpWins); - break; - } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || + pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, delWins, pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; - doDeleteWindows(pChildOp, &pChildInfo->interval, pChildOp->exprSupp.numOfExprs, pBlock, NULL, NULL); + doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL); rebuildIntervalWindow(pOperator, pSup, delWins, pUpdatedMap); addRetriveWindow(delWins, pInfo); taosArrayAddAll(pInfo->pDelWins, delWins); @@ -3294,7 +3318,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap); if (taosArrayGetSize(pUpdated) > 0) { break; } @@ -3334,11 +3358,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { minTs = TMIN(minTs, pBlock->info.window.skey); } + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); if (IS_FINAL_OP(pInfo)) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, - pInfo->pPullDataMap, pUpdatedMap, pOperator); + pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator); closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs); } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; @@ -3374,13 +3399,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) { - pInfo->returnUpdate = false; - ASSERT(!IS_FINAL_OP(pInfo)); - printDataBlock(pInfo->pUpdateRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); - // process the rest of the data - return pInfo->pUpdateRes; - } return NULL; } @@ -3455,9 +3473,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } } - pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); - blockDataEnsureCapacity(pInfo->pUpdateRes, 128); - pInfo->returnUpdate = false; pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); @@ -4276,23 +4291,6 @@ static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { } } -int32_t compareWinKey(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; - SResKeyPos* pos = taosArrayGetP(res, index); - SWinKey* pData = (SWinKey*)pKey; - if (pData->ts == *(int64_t*)pos->key) { - if (pData->groupId > pos->groupId) { - return 1; - } else if (pData->groupId < pos->groupId) { - return -1; - } - return 0; - } else if (pData->ts > *(int64_t*)pos->key) { - return 1; - } - return -1; -} - static void removeSessionDeleteResults(SArray* update, SHashObj* pStDeleted) { int32_t size = taosHashGetSize(pStDeleted); if (size == 0) { @@ -5668,13 +5666,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single interval recv"); - if (pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL); - qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); - continue; - } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { - doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins, - pUpdatedMap); + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || + pBlock->info.type == STREAM_CLEAR) { + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); @@ -5706,8 +5700,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); pOperator->status = OP_RES_TO_RETURN; + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, - pOperator); + pInfo->pDelWins, pOperator); void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { @@ -5717,7 +5712,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - removeDeleteResults(pUpdatedMap, pInfo->pDelWins); taosHashCleanup(pUpdatedMap); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); @@ -5803,8 +5797,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); streamStateSetNumber(pInfo->pState, -1); - pInfo->pUpdateRes = NULL; - pInfo->returnUpdate = false; pInfo->pPhyNode = NULL; // create new child pInfo->pPullDataMap = NULL; pInfo->pPullWins = NULL; // SPullWindowInfo diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 596c16747b..ba84b38dd7 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -24,7 +24,7 @@ typedef struct SStateKey { int64_t opNum; } SStateKey; -static inline int SStateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { +static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStateKey* pWin1 = (SStateKey*)pKey1; SStateKey* pWin2 = (SStateKey*)pKey2; @@ -67,12 +67,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SStateKey), -1, SStateKeyCmpr, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) { goto _err; } // todo refactor - if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { + if (tdbTbOpen("func.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { goto _err; } diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 8942f7f702..b20e2e3592 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -622,4 +622,56 @@ if $data12 != 2 then goto loop3 endi + +sql create database test4 vgroups 1; +sql use test4; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams4 trigger at_once into streamt4 as select _wstart, count(*) c1 from t1 where a > 5 interval(10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); + +sleep 200 +sql select * from streamt4; + +# row 0 +if $rows != 0 then + print =====rows=$rows + return -1 +endi + +sql insert into t1 values(1648791213000,6,2,3,1.0); + +$loop_count = 0 +loop4: +sleep 200 +sql select * from streamt4; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop4 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 +loop5: +sleep 200 +sql select * from streamt4; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows + goto loop5 +endi + + + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/partitionbyColumnInterval.sim b/tests/script/tsim/stream/partitionbyColumnInterval.sim index fd1d796fdb..8375df5064 100644 --- a/tests/script/tsim/stream/partitionbyColumnInterval.sim +++ b/tests/script/tsim/stream/partitionbyColumnInterval.sim @@ -587,8 +587,6 @@ sleep 300 sql delete from st where ts = 1648791223000; -sql select * from test.streamt5; - $loop_count = 0 loop15: @@ -604,11 +602,10 @@ if $rows != 4 then print =====rows=$rows print =====rows=$rows print =====rows=$rows -# goto loop15 + #goto loop15 endi - $loop_all = $loop_all + 1 print ============loop_all=$loop_all diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index b7477fe36c..49e22db1b1 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -5,15 +5,15 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 -sql select * from information_schema.ins_databases +sql create database test vgroups 1; +sql select * from information_schema.ins_databases; if $rows != 3 then return -1 endi print $data00 $data01 $data02 -sql use test +sql use test; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); @@ -48,8 +48,9 @@ if $loop_count == 10 then return -1 endi +print step 0 -sql select * from streamt +sql select * from streamt; # row 0 if $data01 != 1 then @@ -97,7 +98,7 @@ endi print step 1 -sql select * from streamt2 +sql select * from streamt2; # row 0 if $data01 != 1 then @@ -239,6 +240,67 @@ if $data32 != 6 then goto loop0 endi +print step 3.1 + +sql insert into t1 values(1648791216001,2,2,3,1.1); + +$loop_count = 0 + +loop00: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; + +# row 0 +if $data01 != 1 then + print =====data01=$data01 + goto loop00 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop00 +endi + +# row 1 +if $data11 != 3 then + print =====data11=$data11 + goto loop00 +endi + +if $data12 != 5 then + print =====data12=$data12 + goto loop00 +endi + +# row 2 +if $data21 != 3 then + print =====data21=$data21 + goto loop00 +endi + +if $data22 != 7 then + print =====data22=$data22 + goto loop00 +endi + +# row 3 +if $data31 != 1 then + print =====data31=$data31 + goto loop00 +endi + +if $data32 != 3 then + print =====data32=$data32 + goto loop00 +endi + + print step 4 sql create database test1 vgroups 1 @@ -513,6 +575,8 @@ endi $loop_count = 0 +print step 7 + loop4: sleep 100 -- GitLab