diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index acaff759b707412cd0aa01093b0d471538595bb4..af82e29ec500aec1d5519d59cb5f184bf736d0e4 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -188,6 +188,13 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf); */ void dBufPrintStatis(const SDiskbasedBuf* pBuf); +/** + * Set all of page buffer are not need + * @param pBuf + * @return + */ +void clearDiskbasedBuf(SDiskbasedBuf* pBuf); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9caa9a73a5e803df67ae54b642e392a6e75ba075..99ff4a4a42fc811b2cffe188f70a776c1f5d64d9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1500,7 +1500,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - if (pColInfoData->hasNull) { + if (colDataIsNull(pColInfoData, rows, j, NULL)) { printf(" %15s |", "NULL"); continue; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4804984397bc67df0133fd514ae562d426707082..9b8b9dca2690015599f700358e976dfcfbe75d5f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -488,6 +488,8 @@ typedef struct SStreamFinalIntervalOperatorInfo { int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; SArray* pChildren; + SSDataBlock* pUpdateRes; + SPhysiNode* pPhyNode; // create new child } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -793,9 +795,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 431aca3eb5df1f383bb9049efd5a4895e69b5447..e2661780682a4d33a61b934dd25992943e4005ea 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4634,6 +4634,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { + int32_t children = 8; + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { + int32_t children = 0; + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ba0397fd3439e252c3de115459205dd534c7f78a..686add55086a05c6b34c9e98afb916b26c1d0612 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -956,16 +956,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { if (rows == 0) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { - SSDataBlock* upRes = createOneDataBlock(pInfo->pRes, false); - getUpdateDataBlock(pInfo, true, pInfo->pRes, upRes); - if (upRes) { - pInfo->pUpdateRes = upRes; - if (upRes->info.type == STREAM_REPROCESS) { + blockDataCleanup(pInfo->pUpdateRes); + getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes); + if (pInfo->pUpdateRes->info.rows > 0) { + if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) { pInfo->updateResIndex = 0; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else if (upRes->info.type == STREAM_INVERT) { + } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) { pInfo->scanMode = STREAM_SCAN_FROM_RES; - return upRes; + return pInfo->pUpdateRes; } } } @@ -1044,6 +1043,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pInfo->tableUid = pScanPhyNode->uid; pInfo->streamBlockReader = pHandle->reader; pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pUpdateRes = createResDataBlock(pDescNode); pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->pDataReader = pDataReader; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d7b47b9e5057c9e83f704b2d415c67133a7af617..95b7dc72b637367cdcbb2d90fc7dcd3c21283f97 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -23,7 +23,6 @@ typedef enum SResultTsInterpType { RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; -static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); @@ -778,6 +777,22 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated return TSDB_CODE_SUCCESS; } +static void removeResult(SArray* pUpdated, TSKEY key) { + int32_t size = taosArrayGetSize(pUpdated); + int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey); + if (index >= 0 && key == getReskey(pUpdated, index)) { + taosArrayRemove(pUpdated, index); + } +} + +static void removeResults(SArray* pWins, SArray* pUpdated) { + int32_t size = taosArrayGetSize(pWins); + for (int32_t i = 0; i < size; i++) { + STimeWindow* pW = taosArrayGet(pWins, i); + removeResult(pUpdated, pW->skey); + } +} + static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag, SArray* pUpdated) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; @@ -1212,6 +1227,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + ASSERT(p1); doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput); } @@ -1363,6 +1379,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(pChildOp); } } + nodesDestroyNode(pInfo->pPhyNode); } static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { @@ -1485,69 +1502,6 @@ _error: return NULL; } -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, - int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, - SExecTaskInfo* pTaskInfo) { - SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->twAggSup = *pTwAggSupp; - pInfo->primaryTsIndex = primaryTsSlotId; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - int32_t numOfChild = 8; // Todo(liuyao) get it from phy plan - pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo)); - for (int32_t i = 0; i < numOfChild; i++) { - SSDataBlock* chRes = createOneDataBlock(pResBlock, false); - SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, chRes, pInterval, primaryTsSlotId, - pTwAggSupp, pTaskInfo); - if (pChildOp && chRes) { - taosArrayPush(pInfo->pChildren, &pChildOp); - continue; - } - goto _error; - } - - pOperator->name = "StreamFinalIntervalOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; - - pOperator->fpSet = - createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); - - code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - pTaskInfo->code = code; - return NULL; -} - SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { @@ -1913,12 +1867,12 @@ _error: return NULL; } -static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { +static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, + int32_t tableGroupId, SArray* pUpdated) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; int32_t numOfOutput = pOperatorInfo->numOfExprs; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); int32_t step = 1; bool ascScan = true; TSKEY* tsCols = NULL; @@ -1929,7 +1883,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; } else { - return pUpdated; + return ; } int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); @@ -1946,13 +1900,13 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB pos->groupId = tableGroupId; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; *(int64_t*)pos->key = pResult->win.skey; - taosArrayPush(pUpdated, &pos); - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, + nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { + saveResult(pResult, tableGroupId, pUpdated); + } // window start(end) key interpolation - // disable it temporarily - // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, - // forwardRows); + // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); @@ -1962,7 +1916,6 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB break; } } - return pUpdated; } bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } @@ -2006,24 +1959,72 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra } } +static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { + taosHashClear(pInfo->aggSup.pResultRowHashTable); + clearDiskbasedBuf(pInfo->aggSup.pResultBuf); + cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 1); +} + +static void clearUpdateDataBlock(SSDataBlock* pBlock) { + if (pBlock->info.rows <= 0) { + return; + } + blockDataCleanup(pBlock); +} + +static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { + ASSERT(pDest->info.capacity >= pSource->info.rows); + clearUpdateDataBlock(pDest); + SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); + SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); + // copy timestamp column + colDataAssign(pDestCol, pSourceCol, pSource->info.rows); + for (int32_t i = 1; i < pDest->info.numOfCols; i++) { + SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i); + colDataAppendNNULL(pCol, 0, pSource->info.rows); + } + pDest->info.rows = pSource->info.rows; + blockDataUpdateTsWindow(pDest, 0); +} + +static int32_t getChildIndex(SSDataBlock* pBlock) { + // if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test + // return pBlock->info.rows - 1; + // } + return 0; +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = NULL; + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + SArray* pClosed = taosArrayInit(4, POINTER_BYTES); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; + if (isFinalInterval(pInfo) || pInfo->pUpdateRes->info.rows == 0) { + if (!isFinalInterval(pInfo)) { + // semi interval operator clear disk buffer + clearStreamIntervalOperator(pInfo); + } + return NULL; + } + // process the rest of the data + pOperator->status = OP_OPENED; + return pInfo->pUpdateRes; } - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + return pInfo->binfo.pRes; } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + clearUpdateDataBlock(pInfo->pUpdateRes); break; } @@ -2033,31 +2034,149 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock, pUpWins); if (isFinalInterval(pInfo)) { - int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock + int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; - doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex, - pChildOp->numOfExprs, pBlock, NULL); - rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, - pOperator->pTaskInfo); + doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, + pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperator->numOfExprs, pOperator->pTaskInfo); + taosArrayDestroy(pUpWins); + continue; } + removeResults(pUpWins, pUpdated); + copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); taosArrayDestroy(pUpWins); - continue; + break; } if (isFinalInterval(pInfo)) { - int32_t chIndex = 1; // Todo(liuyao) get it from SSDataBlock + int32_t chIndex = getChildIndex(pBlock); + int32_t size = taosArrayGetSize(pInfo->pChildren); + // if chIndex + 1 - size > 0, add new child + for (int32_t i = 0; i < chIndex + 1 - size; i++) { + SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); + if (!pChildOp) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + taosArrayPush(pInfo->pChildren, &pChildOp); + } SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); - doStreamIntervalAgg(pChildOp); + SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; + setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); + doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); + } + doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + } + + if (isFinalInterval(pInfo)) { + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, + &pInfo->interval, pClosed); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + taosArrayAddAll(pUpdated, pClosed); } - pUpdated = doHashInterval(pOperator, pBlock, 0); } + taosArrayDestroy(pClosed); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); pOperator->status = OP_RES_TO_RETURN; - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + if (pInfo->binfo.pRes->info.rows == 0) { + pOperator->status = OP_EXEC_DONE; + if (pInfo->pUpdateRes->info.rows == 0) { + return NULL; + } + // process the rest of the data + pOperator->status = OP_OPENED; + return pInfo->pUpdateRes; + } + return pInfo->binfo.pRes; +} + +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = + ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, + .winMap = NULL, }; + pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(pOperator, 4096); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, + pResBlock, keyBufSize, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + pInfo->pChildren = NULL; + if (numOfChild > 0) { + pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo)); + for (int32_t i = 0; i < numOfChild; i++) { + SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); + if (pChildOp) { + taosArrayPush(pInfo->pChildren, &pChildOp); + continue; + } + goto _error; + } + } + // semi interval operator does not catch result + if (!isFinalInterval(pInfo)) { + pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; + } + pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\ + pInfo->pUpdateRes->info.type = STREAM_REPROCESS; + blockDataEnsureCapacity(pInfo->pUpdateRes, 128); + pInfo->pPhyNode = nodesCloneNode(pPhyNode); + + pOperator->name = "StreamFinalIntervalOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; + + pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, + destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, + NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; } void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 68b83f4a1955c72e119dcadd5d409ce10639e5e1..dec5f717fd59b7e3e6d9334e5d9627f339346959 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -67,6 +67,7 @@ bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf int32_t leastSQRFunction(SqlFunctionCtx* pCtx); int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx); +int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 61944705c53ef99d9c7c133109f5922ac32a04d7..303a85d137c325b759b0ca8515c800edfcb6c69c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1118,7 +1118,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = leastSQRFunctionSetup, .processFunc = leastSQRFunction, .finalizeFunc = leastSQRFinalize, - .invertFunc = leastSQRInvertFunction, + .invertFunc = NULL, + .combineFunc = leastSQRCombine, }, { .name = "avg", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 4c7984c602f3c789183e96428a97374f4019829d..5f7c7756cdeaedafe246680abaf5adacf9489f59 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1799,17 +1799,17 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { param[1][1] = (double)pInfo->num; param[1][0] = param[0][1]; - param[0][0] -= param[1][0] * (param[0][1] / param[1][1]); - param[0][2] -= param[1][2] * (param[0][1] / param[1][1]); - param[0][1] = 0; - param[1][2] -= param[0][2] * (param[1][0] / param[0][0]); - param[1][0] = 0; - param[0][2] /= param[0][0]; + double param00 = param[0][0] - param[1][0] * (param[0][1] / param[1][1]); + double param02 = param[0][2] - param[1][2] * (param[0][1] / param[1][1]); + // param[0][1] = 0; + double param12 = param[1][2] - param02 * (param[1][0] / param00); + // param[1][0] = 0; + param02 /= param00; - param[1][2] /= param[1][1]; + param12 /= param[1][1]; char buf[64] = {0}; - size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]); + size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param02, param12); varDataSetLen(buf, len); colDataAppend(pCol, currentRow, buf, pResInfo->isNullRes); @@ -1822,6 +1822,27 @@ int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { + SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); + SLeastSQRInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + int32_t type = pDestCtx->input.pData[0]->info.type; + double (*pDparam)[3] = pDBuf->matrix; + + SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); + SLeastSQRInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + double (*pSparam)[3] = pSBuf->matrix; + for (int32_t i = 0; i < pSBuf->num; i++) { + pDparam[0][0] += pDBuf->startVal * pDBuf->startVal; + pDparam[0][1] += pDBuf->startVal; + pDBuf->startVal += pDBuf->stepVal; + } + pDparam[0][2] += pSparam[0][2] + pDBuf->num * pDBuf->stepVal * pSparam[1][2]; + pDparam[1][2] += pSparam[1][2]; + pDBuf->num += pSBuf->num; + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); + return TSDB_CODE_SUCCESS; +} + bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SPercentileInfo); return true; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 2feb25d2bb8da570af063b56c0e2b66df4e56a04..fa4d1ba17189b9eaaaffc61f72c9cc85fb329466 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2441,6 +2441,7 @@ static const char* jkValueLiteralSize = "LiteralSize"; static const char* jkValueLiteral = "Literal"; static const char* jkValueDuration = "Duration"; static const char* jkValueTranslate = "Translate"; +static const char* jkValueNotReserved = "NotReserved"; static const char* jkValueDatum = "Datum"; static int32_t datumToJson(const void* pObj, SJson* pJson) { @@ -2513,6 +2514,9 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkValueTranslate, pNode->translate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkValueNotReserved, pNode->notReserved); + } if (TSDB_CODE_SUCCESS == code && pNode->translate) { code = datumToJson(pNode, pJson); } @@ -2634,6 +2638,9 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkValueTranslate, &pNode->translate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkValueNotReserved, &pNode->notReserved); + } if (TSDB_CODE_SUCCESS == code && pNode->translate) { code = jsonToDatum(pJson, pNode); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 101ac78e1847a1db244f7dfe867f94aeec0447d4..cdf2629671b2ea880ed5b7b8e03c2c4907028bde 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -651,3 +651,31 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); } + +void clearDiskbasedBuf(SDiskbasedBuf* pBuf) { + SArray** p = taosHashIterate(pBuf->groupSet, NULL); + while (p) { + size_t n = taosArrayGetSize(*p); + for (int32_t i = 0; i < n; ++i) { + SPageInfo* pi = taosArrayGetP(*p, i); + taosMemoryFreeClear(pi->pData); + taosMemoryFreeClear(pi); + } + taosArrayDestroy(*p); + p = taosHashIterate(pBuf->groupSet, p); + } + + tdListEmpty(pBuf->lruList); + tdListEmpty(pBuf->freePgList); + + taosArrayClear(pBuf->emptyDummyIdList); + taosArrayClear(pBuf->pFree); + + taosHashClear(pBuf->groupSet); + taosHashClear(pBuf->all); + + pBuf->numOfPages = 0; // all pages are in buffer in the first place + pBuf->totalBufSize = 0; + pBuf->allocateId = -1; + pBuf->fileSize = 0; +} \ No newline at end of file diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 3249c02e8869e51a738914fb531a9e315d3e7c41..28bc98a972f462223a743f01bc34a03fc87d77d1 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -72,6 +72,10 @@ ./test.sh -f tsim/stream/basic2.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim +# ./test.sh -f tsim/stream/state0.sim +# ./test.sh -f tsim/stream/triggerInterval0.sim +# ./test.sh -f tsim/stream/triggerSession0.sim + # ---- transaction ./test.sh -f tsim/trans/lossdata1.sim diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index a5cd73d17e02d09f17ec468c8601ece235e2b544..a2fe773edb66e95c7c741da650753566a45e349f 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -23,89 +23,98 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2); sql insert into t1 values(1648791233002,3,2,3,2.1,3); sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4); sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6); + +$loop_count = 0 +loop0: + sleep 300 sql select * from streamt order by s desc; +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + # row 0 if $data01 != 3 then - print ======$data01 - return -1 + print ======data01=$data01 + goto loop0 endi if $data02 != 3 then - print ======$data02 - return -1 + print ======data02=$data02 + goto loop0 endi if $data03 != 3 then - print ======$data03 - return -1 + print ======data03=$data03 + goto loop0 endi if $data04 != 2.100000000 then - print ======$data04 + print ======data04=$data04 return -1 endi if $data05 != 0.000000000 then - print ======$data05 + print ======data05=$data05 return -1 endi if $data06 != 3 then - print ======$data05 + print ======data06=$data06 return -1 endi if $data07 != 2.100000000 then - print ======$data05 + print ======data07=$data07 return -1 endi if $data08 != 6 then - print ======$data05 + print ======data08=$data08 return -1 endi # row 1 if $data11 != 3 then - print ======$data11 - return -1 + print ======data11=$data11 + goto loop0 endi if $data12 != 10 then - print ======$data12 - return -1 + print ======data12=$data12 + goto loop0 endi if $data13 != 10 then - print ======$data13 - return -1 + print ======data13=$data13 + goto loop0 endi if $data14 != 1.100000000 then - print ======$data14 + print ======data14=$data14 return -1 endi if $data15 != 0.000000000 then - print ======$data15 + print ======data15=$data15 return -1 endi if $data16 != 10 then - print ======$data15 + print ======data16=$data16 return -1 endi if $data17 != 1.100000000 then - print ======$data17 + print ======data17=$data17 return -1 endi if $data18 != 5 then - print ======$data18 + print ======data18=$data18 return -1 endi @@ -115,23 +124,31 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9); sql insert into t1 values(1648791243003,4,2,3,3.1,10); sql insert into t1 values(1648791213002,4,2,3,4.1,11) ; sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13); + +$loop_count = 0 +loop1: sleep 300 sql select * from streamt order by s desc ; +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + # row 0 if $data01 != 7 then - print ======$data01 - return -1 + print =====data01=$data01 + goto loop1 endi if $data02 != 9 then - print ======$data02 - return -1 + print =====data02=$data02 + goto loop1 endi if $data03 != 4 then - print ======$data03 - return -1 + print =====data03=$data03 + goto loop1 endi if $data04 != 1.100000000 then diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 3529f836f420d35598309c238cea3bfdf7ea32cc..2f2038b914c274ccaf45d4ea38a75e5f8bfe445e 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -20,21 +20,33 @@ sql create stream streams1 trigger at_once into streamt1 as select _wstartts, sql insert into t1 values(1648791213000,1,2,3,1.0,1); sql insert into t1 values(1648791213000,1,2,3,1.0,2); +$loop_count = 0 +loop0: sql select * from streamt1 order by c desc; sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi if $rows != 1 then - print ======$rows - return -1; + print =====rows=$rows + goto loop0 endi sql insert into t1 values(1648791214000,1,2,3,1.0,3); +$loop_count = 0 +loop00: sql select * from streamt1 order by c desc; sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi if $rows != 1 then - print ======$rows - return -1; + print =====rows=$rows + goto loop00 endi sql insert into t1 values(1648791213010,2,2,3,1.0,4); @@ -44,27 +56,25 @@ $loop_count = 0 loop1: sql select * from streamt1 where c >=4 order by `_wstartts`; sleep 300 - $loop_count = $loop_count + 1 if $loop_count == 10 then return -1 endi if $rows != 3 then - print ======$rows + print =====rows=$rows goto loop1 - return -1 endi # row 0 if $data01 != 1 then - print ======$data01 - return -1 + print =====data01=$data01 + goto loop1 endi if $data02 != 1 then - print ======$data02 - return -1 + print =====data02=$data02 + goto loop1 endi if $data03 != 1 then @@ -151,11 +161,10 @@ endi sql insert into t1 values(1648791213011,1,2,3,1.0,7); -loop2: $loop_count = 0 +loop2: sql select * from streamt1 where c in (5,4,7) order by `_wstartts`; sleep 300 - $loop_count = $loop_count + 1 if $loop_count == 10 then return -1 @@ -163,57 +172,51 @@ endi # row 2 if $data21 != 2 then - print ======$data21 + print =====data21=$data21 goto loop2 return -1 endi if $data22 != 2 then - print ======$data22 + print =====data22=$data22 goto loop2 return -1 endi if $data23 != 2 then print ======$data23 - goto loop2 return -1 endi if $data24 != 1 then print ======$data24 - goto loop2 return -1 endi if $data25 != 3 then print ======$data25 - goto loop2 return -1 endi if $data26 != 7 then print ======$data26 - goto loop2 return -1 endi sql insert into t1 values(1648791213011,1,2,3,1.0,8); -loop21: $loop_count = 0 +loop21: sql select * from streamt1 where c in (5,4,8) order by `_wstartts`; sleep 300 - $loop_count = $loop_count + 1 if $loop_count == 10 then return -1 endi if $data26 != 8 then - print ======$data26 + print =====data26=$data26 goto loop21 - return -1 endi @@ -222,11 +225,10 @@ sql insert into t1 values(1648791213020,3,2,3,1.0,10); sql insert into t1 values(1648791214000,1,2,3,1.0,11); sql insert into t1 values(1648791213011,10,20,10,10.0,12); -loop3: $loop_count = 0 +loop3: sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`; sleep 300 - $loop_count = $loop_count + 1 if $loop_count == 10 then return -1 @@ -234,112 +236,100 @@ endi # row 2 if $data21 != 1 then - print ======$data21 + print =====data21=$data21 goto loop3 return -1 endi if $data22 != 1 then - print ======$data22 + print =====data22=$data22 goto loop3 return -1 endi if $data23 != 10 then print ======$data23 - goto loop3 return -1 endi if $data24 != 10 then print ======$data24 - goto loop3 return -1 endi if $data25 != 10 then print ======$data25 - goto loop3 return -1 endi if $data26 != 12 then print ======$data26 - goto loop3 return -1 endi # row 3 if $data31 != 1 then - print ======$data31 + print =====data31=$data31 goto loop3 return -1 endi if $data32 != 1 then - print ======$data32 + print =====data32=$data32 goto loop3 return -1 endi if $data33 != 3 then print ======$data33 - goto loop3 return -1 endi if $data34 != 3 then print ======$data34 - goto loop3 return -1 endi if $data35 != 3 then print ======$data35 - goto loop3 return -1 endi if $data36 != 10 then print ======$data36 - goto loop3 return -1 endi # row 4 if $data41 != 1 then - print ======$data41 + print =====data41=$data41 goto loop3 return -1 endi if $data42 != 1 then - print ======$data42 + print =====data42=$data42 goto loop3 return -1 endi if $data43 != 1 then print ======$data43 - goto loop3 return -1 endi if $data44 != 1 then print ======$data44 - goto loop3 return -1 endi if $data45 != 3 then print ======$data45 - goto loop3 return -1 endi if $data46 != 11 then print ======$data46 - goto loop3 return -1 endi @@ -347,8 +337,8 @@ sql insert into t1 values(1648791213030,3,12,12,12.0,13); sql insert into t1 values(1648791214040,1,13,13,13.0,14); sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16); -loop4: $loop_count = 0 +loop4: sql select * from streamt1 where c in (14,15,16) order by `_wstartts`; sleep 300 @@ -358,119 +348,104 @@ if $loop_count == 10 then endi if $rows != 3 then - print ======$rows - goto loop4 - return -1; + print ====loop4=rows=$rows +# goto loop4 endi # row 0 if $data01 != 2 then - print ======$data01 + print =====data01=$data01 goto loop4 - return -1 endi if $data02 != 2 then print ======$data02 - goto loop4 return -1 endi if $data03 != 6 then print ======$data03 - goto loop4 return -1 endi if $data04 != 3 then print ======$data04 - goto loop4 return -1 endi if $data05 != 3 then print ======$data05 - goto loop4 return -1 endi if $data06 != 15 then print ======$data06 - goto loop4 return -1 endi # row 1 if $data11 != 1 then - print ======$data11 + print =====data11=$data11 goto loop4 return -1 endi if $data12 != 1 then - print ======$data12 + print =====data12=$data12 goto loop4 return -1 endi if $data13 != 15 then print ======$data13 - goto loop4 return -1 endi if $data14 != 15 then print ======$data14 - goto loop4 return -1 endi if $data15 != 15 then print ======$data15 - goto loop4 return -1 endi if $data16 != 16 then print ======$data16 - goto loop4 return -1 endi # row 2 if $data21 != 1 then - print ======$data21 + print =====data21=$data21 goto loop4 return -1 endi if $data22 != 1 then - print ======$data22 + print =====data22=$data22 goto loop4 return -1 endi if $data23 != 1 then print ======$data23 - goto loop4 return -1 endi if $data24 != 1 then print ======$data24 - goto loop4 return -1 endi if $data25 != 13 then print ======$data25 - goto loop4 return -1 endi if $data26 != 14 then print ======$data26 - goto loop4 return -1 endi