From 270771e39ef6685d9157da49065faf017b02e9ea Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 26 May 2022 14:06:06 +0800 Subject: [PATCH] feat(stream): stream final session operator --- .gitignore | 1 + include/libs/nodes/nodes.h | 1 + source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 23 +- source/libs/executor/src/timewindowoperator.c | 227 +++++++++++++++--- 6 files changed, 205 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index f70f5987b2..d5c7f763cf 100644 --- a/.gitignore +++ b/.gitignore @@ -110,3 +110,4 @@ contrib/* !contrib/CMakeLists.txt !contrib/test sql +debug*/ diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3c5278011a..3860266725 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -214,6 +214,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW, + QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_PARTITION, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ec204a8f60..b36d3566bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -465,6 +465,7 @@ typedef struct SStreamFinalIntervalOperatorInfo { SAggSupporter aggSup; // aggregate supporter int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; + SArray* pChildren; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -581,6 +582,7 @@ typedef struct SStreamSessionAggOperatorInfo { SSDataBlock* pDelRes; SHashObj* pStDeleted; void* pDelIterator; + SArray* pChildren; // cache for children's result; } SStreamSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { @@ -722,7 +724,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, @@ -797,7 +799,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, +int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 593b79ecc8..c983dd5c2c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5162,7 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, +int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) { pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c9ad4579c..75907d1a0d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -833,15 +833,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - SSDataBlock* pDB = getDataFromCatch(pInfo); - if (pDB != NULL) { - return pDB; - } else { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } - } - if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); pOperator->status = OP_EXEC_DONE; @@ -849,17 +840,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } int32_t current = pInfo->validBlockIndex++; - SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); - if (pBlock->info.type == STREAM_REPROCESS) { - pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; - } else { - int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0); - if (code != TDB_CODE_SUCCESS) { - pTaskInfo->code = code; - longjmp(pTaskInfo->env, code); - } - } - return pBlock; + return taosArrayGetP(pInfo->pBlockLists, current); } else { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); @@ -1023,7 +1004,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->interval = pSTInfo->interval; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; - initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan + initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9346dbf54a..6965771c73 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1067,7 +1067,8 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, } static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, - SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) { + SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock, + SArray* pUpWins) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; int32_t step = 0; @@ -1079,6 +1080,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput); + if (pUpWins) { + taosArrayPush(pUpWins, &win); + } } } @@ -1119,7 +1123,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, - pOperator->numOfExprs, pBlock); + pOperator->numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } @@ -1154,6 +1158,15 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupAggSup(&pInfo->aggSup); + if (pInfo->pChildren) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); + destroyIntervalOperatorInfo(pChildOp->info, numOfOutput); + taosMemoryFreeClear(pChildOp->info); + taosMemoryFreeClear(pChildOp); + } + } } bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { @@ -1228,32 +1241,38 @@ _error: SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; pInfo->interval = *pInterval; pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + int32_t numOfChild = 8;// Todo(liuyao) get it from phy plan + pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo)); + for (int32_t i = 0; i < numOfChild; i++) { + SSDataBlock* chRes = createOneDataBlock(pResBlock, false); + SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, + chRes, pInterval, primaryTsSlotId, pTwAggSupp, NULL, pTaskInfo); + if (pChildOp && chRes) { + taosArrayPush(pInfo->pChildren, &pChildOp); + continue; + } + goto _error; + } pOperator->name = "StreamFinalIntervalOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; @@ -1703,6 +1722,51 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB return pUpdated; } +bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { + return pInfo->pChildren != NULL; +} + +void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, + int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + for (int32_t k = 0; k < numOfOutput; ++k) { + if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) { + continue; + } + int32_t code = TSDB_CODE_SUCCESS; + if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { + code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); + pTaskInfo->code = code; + longjmp(pTaskInfo->env, code); + } + } + } +} + +static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArray *pWinArray, + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + int32_t size = taosArrayGetSize(pWinArray); + ASSERT(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + STimeWindow* pParentWin = taosArrayGet(pWinArray, i); + SResultRow* pCurResult = NULL; + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, + pTaskInfo); + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + for (int32_t j = 0; j < numOfChildren; j++) { + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); + SIntervalAggOperatorInfo* pChInfo = pChildOp->info; + SResultRow* pChResult = NULL; + setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, + 0, pChInfo->binfo.pCtx, pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, + &pChInfo->aggSup, pTaskInfo); + compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); + } + } +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -1726,10 +1790,26 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { + SArray *pUpWins = taosArrayInit(8, sizeof(STimeWindow)); doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, - pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock); + pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock, pUpWins); + if (isFinalInterval(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; + doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, + pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperator->numOfExprs, pOperator->pTaskInfo); + } + taosArrayDestroy(pUpWins); continue; } + if (isFinalInterval(pInfo)) { + int32_t chIndex = 1; //Todo(liuyao) get it from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); + doStreamIntervalAgg(pChildOp); + } pUpdated = doHashInterval(pOperator, pBlock, 0); } @@ -1752,6 +1832,16 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + if (pInfo->pChildren != NULL) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo *pChild = taosArrayGetP(pInfo->pChildren, i); + SStreamSessionAggOperatorInfo* pChInfo = pChild->info; + destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + taosMemoryFreeClear(pChild); + taosMemoryFreeClear(pChInfo); + } + } } int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, @@ -1780,6 +1870,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pI SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1789,7 +1880,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, initResultSizeInfo(pOperator, 4096); - int32_t code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); + code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1820,6 +1911,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); blockDataEnsureCapacity(pInfo->pDelRes, 64); + pInfo->pChildren = NULL; pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; @@ -2068,24 +2160,6 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) return size - startIndex - 1; } -void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, - int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { - for (int32_t k = 0; k < numOfOutput; ++k) { - if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) { - continue; - } - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { - code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); - pTaskInfo->code = code; - longjmp(pTaskInfo->env, code); - } - } - } -} - void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex); @@ -2164,7 +2238,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, } static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* pBinfo, - SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap) { + SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap, SArray* result) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY *tsCols = (TSKEY*)pColDataInfo->pData; int32_t step = 0; @@ -2173,7 +2247,11 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex); step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL); + ASSERT(isInWindow(pCurWin, tsCols[i], gap)); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput); + if (result) { + taosArrayPush(result, pCurWin); + } } } @@ -2215,6 +2293,42 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It } } +static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray *pWinArray, + int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + int32_t size = taosArrayGetSize(pWinArray); + ASSERT(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); + SResultRow* pCurResult = NULL; + setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo); + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + for (int32_t j = 0; j < numOfChildren; j++) { + SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); + SStreamSessionAggOperatorInfo* pChInfo = pChild->info; + SArray* pChWins = pChInfo->streamAggSup.pResultRows; + int32_t chWinSize = taosArrayGetSize(pChWins); + int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, + TSDB_ORDER_DESC, getSessionWindowEndkey); + for (int32_t k = index; k > 0 && k < chWinSize; k++) { + SResultWindowInfo* pcw = taosArrayGet(pChWins, k); + if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { + SResultRow* pChResult = NULL; + setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId, + numOfOutput, pChInfo->binfo.rowCellInfoOffset, &pChInfo->streamAggSup, pTaskInfo); + compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); + continue; + } + break; + } + } + } +} + +bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) { + return pInfo->pChildren != NULL; +} + static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2247,10 +2361,25 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { + SArray *pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0, - pOperator->numOfExprs, pInfo->gap); + pOperator->numOfExprs, pInfo->gap, pWins); + if (isFinalSession(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, + pChildOp->numOfExprs, pChildInfo->gap, NULL); + rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); + } + taosArrayDestroy(pWins); continue; } + if (isFinalSession(pInfo)) { + int32_t childIndex = 0; //Todo(liuyao) get child id from SSDataBlock + SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + doStreamSessionWindowAggImpl(pOperator, pBlock, NULL, NULL); + } doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted); } @@ -2271,3 +2400,39 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) { pInfo->streamAggSup.pResultBuf); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } + +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, + SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, + int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; + SStreamSessionAggOperatorInfo* pInfo = NULL; + SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, + numOfCols, pResBlock, gap, tsSlotId, pTwAggSupp, pTaskInfo); + if (pOperator == NULL) { + goto _error; + } + pOperator->name = "StreamFinalSessionWindowAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW; + int32_t numOfChild = 1; //Todo(liuyao) get it from phy plan + pInfo = pOperator->info; + pInfo->pChildren = taosArrayInit(8, sizeof(void *)); + for (int32_t i = 0; i < numOfChild; i++) { + SOperatorInfo* pChild = createStreamSessionAggOperatorInfo(NULL, pExprInfo, + numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo); + if (pChild == NULL) { + goto _error; + } + taosArrayPush(pInfo->pChildren, &pChild); + } + return pOperator; + +_error: + if (pInfo != NULL) { + destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); + } + + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} -- GitLab