diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 362f56c865ad331f31dfe55c936c0e59f5820fdc..cc51f96f6c7c0a5fed3b231a360335ad92429cf8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -50,6 +50,7 @@ typedef enum EStreamType { STREAM_INVERT, STREAM_REPROCESS, STREAM_INVALID, + STREAM_GET_ALL, } EStreamType; typedef struct { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d5946516e8adee3c6ad37553b04cb9c873cdc6a3..e051a43f21b72aec5dbac196afb3f7407fa150fd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1496,6 +1496,7 @@ typedef struct { #define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_WINDOW_CLOSE 2 +#define STREAM_TRIGGER_MAX_DELAY 3 typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index deb73821839b8625a12c4b6e7f21a02be8f26c85..d41928e9095e409edac7f893d1f1723171297cb3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -167,17 +167,17 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* } } } - } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 71e6a3ad0922ec41ec5fdb57d67a35a8508597e6..ab595a3e341ea5fa15be60fbad94d183f2dcb072 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) { return *(int64_t*)pos->key; } -static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) { +static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, + SArray* pUpdated) { int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey); + int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey); if (index == -1) { index = 0; } else { TSKEY resTs = getReskey(pUpdated, index); - if (resTs < result->win.skey) { + if (resTs < ts) { index++; } else { return TSDB_CODE_SUCCESS; @@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated return TSDB_CODE_OUT_OF_MEMORY; } newPos->groupId = groupId; - newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset}; - *(int64_t*)newPos->key = result->win.skey; + newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + *(int64_t*)newPos->key = ts; if (taosArrayInsert(pUpdated, index, &newPos) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } +static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { + return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); +} + static void removeResult(SArray* pUpdated, TSKEY key) { int32_t size = taosArrayGetSize(pUpdated); int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey); @@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } } @@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } } @@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva } } +static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { + void* pIte = NULL; + size_t keyLen = 0; + while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + uint64_t groupId = *(uint64_t*)key; + ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); + TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); + SResultRowPosition* pPos = (SResultRowPosition*)pIte; + int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SArray* closeWins) { void* pIte = NULL; @@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); taosHashRemove(pHashMap, keyBuf, keyLen); - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - if (pos == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pos->groupId = groupId; - pos->pos = *(SResultRowPosition*)pIte; - *(int64_t*)pos->key = ts; - if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) { - taosMemoryFree(pos); - return TSDB_CODE_OUT_OF_MEMORY; + SResultRowPosition* pPos = (SResultRowPosition*)pIte; + if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - SArray* pClosed = taosArrayInit(4, POINTER_BYTES); - while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; + } else if (pBlock->info.type == STREAM_GET_ALL && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + continue; } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); } + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); - taosArrayAddAll(pUpdated, pClosed); - - taosArrayDestroy(pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResult(pResult, tableGroupId, pUpdated); + saveResultRow(pResult, tableGroupId, pUpdated); } // window start(end) key interpolation // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, @@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - SArray* pClosed = taosArrayInit(4, POINTER_BYTES); if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); taosArrayDestroy(pUpWins); break; + } else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) && + pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) { + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + continue; } + if (isFinalInterval(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } if (isFinalInterval(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - taosArrayAddAll(pUpdated, pClosed); - } + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); } - taosArrayDestroy(pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);