diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 253d91f1d650fbcbfa5ea7a322757d92f1104364..2c8c8dd413669c6681002672a8acb900a22dc7bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1018,6 +1018,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); +bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6de77f63f4444f723dcb62999a0224930b869c02..3ac08bc20e31ca829b733387f33b8413cb9a60f1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1277,8 +1277,12 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { } void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { + bool init = false; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset); + if (init) { + continue; + } struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { @@ -1295,6 +1299,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } else { pResInfo->initialized = true; } + } else { + init = true; } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2f6d04f684bcc6e6c4c2791139166a0d8c283d1d..d8de8df163207b89120c03dc3b8c42dcff3776d9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1174,10 +1174,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); + bool isClosed = false; + STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; + if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { + win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); + isClosed = isCloseWindow(&win, &pInfo->twAggSup); + } // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) && + if ((update || (isSignleIntervalWindow(pInfo) && isClosed && isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f46294820baa913d8b499846db89cff8fc04aa1f..2b67f2bf01e9eb05f29c7846300dcf0952778cac 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -851,23 +851,34 @@ static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t g return TSDB_CODE_SUCCESS; } -static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { - return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); +static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { + SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (newPos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + newPos->groupId = groupId; + newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + *(int64_t*)newPos->key = ts; + SWinRes key = {.ts = ts, .groupId = groupId}; + if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { + taosMemoryFree(newPos); + } + return TSDB_CODE_SUCCESS; } -static void removeResult(SArray* pUpdated, SWinRes* pKey) { - int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey); - if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) { - taosArrayRemove(pUpdated, index); - } +static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { + return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);; +} + +static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { + return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); } -static void removeResults(SArray* pWins, SArray* pUpdated) { +static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { SWinRes* pW = taosArrayGet(pWins, i); - removeResult(pUpdated, pW); + taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes)); } } @@ -894,11 +905,14 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { return -1; } -static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) { - int32_t upSize = taosArrayGetSize(pUpdated); +static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { + if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { + return; + } int32_t delSize = taosArrayGetSize(pDelWins); - for (int32_t i = 0; i < upSize; i++) { - SResKeyPos* pResKey = taosArrayGetP(pUpdated, i); + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + SResKeyPos* pResKey = (SResKeyPos*)pIte; int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { taosArrayRemove(pDelWins, index); @@ -914,7 +928,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - int32_t scanFlag, SArray* pUpdated) { + int32_t scanFlag, SHashObj* pUpdatedMap) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -940,7 +954,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResultRow(pResult, tableGroupId, pUpdated); + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } } @@ -997,7 +1011,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - saveResultRow(pResult, tableGroupId, pUpdated); + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } @@ -1437,7 +1451,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } } -static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { +static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -1446,7 +1460,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); SResultRowPosition* pPos = (SResultRowPosition*)pIte; - int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins); + int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1455,7 +1469,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { } static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, - SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages, + SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { qDebug("===stream===close interval window"); void* pIte = NULL; @@ -1487,7 +1501,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, } SResultRowPosition* pPos = (SResultRowPosition*)pIte; if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); + int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1577,11 +1591,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { break; } + // qInfo("===stream===%ld", pBlock->info.version); printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_CLEAR) { @@ -1594,7 +1611,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } @@ -1617,17 +1634,24 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); } pOperator->status = OP_RES_TO_RETURN; - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated, + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosHashCleanup(pUpdatedMap); + taosArraySort(pUpdated, resultrowComparAsc); + finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - removeDeleteResults(pUpdated, pInfo->pDelWins); + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows > 0) { return pInfo->pDelRes; @@ -2867,7 +2891,7 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { } static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, - SArray* pUpdated) { + SHashObj* pUpdatedMap) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -2949,8 +2973,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResultRow(pResult, tableGroupId, pUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + saveWinResultRow(pResult, tableGroupId, pUpdatedMap); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); @@ -3056,6 +3080,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); TSKEY maxTs = INT64_MIN; SExprSupp* pSup = &pOperator->exprSupp; @@ -3113,7 +3139,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { clearSpecialDataBlock(pInfo->pUpdateRes); - removeDeleteResults(pUpdated, pInfo->pDelWins); + removeDeleteResults(pUpdatedMap, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); break; @@ -3140,7 +3166,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { taosArrayDestroy(pUpWins); continue; } - removeResults(pUpWins, pUpdated); + removeResults(pUpWins, pUpdatedMap); copyDataBlock(pInfo->pUpdateRes, pBlock); // copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); pInfo->returnUpdate = true; @@ -3158,15 +3184,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated); continue; } - removeResults(pInfo->pDelWins, pUpdated); + removeResults(pInfo->pDelWins, pUpdatedMap); break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { - getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); + getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); - removeResults(pUpWins, pUpdated); + removeResults(pUpWins, pUpdatedMap); taosArrayDestroy(pUpWins); if (taosArrayGetSize(pUpdated) > 0) { break; @@ -3182,7 +3208,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); } setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); - doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); + doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3207,12 +3233,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); if (IS_FINAL_OP(pInfo)) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, - pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); } else { pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; } + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosHashCleanup(pUpdatedMap); + taosArraySort(pUpdated, resultrowComparAsc); + finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index c686fa05cecf7fa611df67addd6ab9867feb3529..0b1ce27b7702e23066ff27fdd0c78e9d7dc8cd6e 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -124,7 +124,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma } pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->pCloseWinSBF = NULL; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->maxVersion = 0; pInfo->scanGroupId = 0;