From 246562024590f03e0dbaaa22cb5c34647a5cc31e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sun, 10 Jul 2022 16:09:54 +0800 Subject: [PATCH] feat(stream): support expressions --- source/common/src/tdatablock.c | 2 +- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/timewindowoperator.c | 64 +++++++++++++++++-- 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4e17a9aea0..3bb829f77a 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1746,7 +1746,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - if (colDataIsNull(pColInfoData, rows, j, NULL)) { + if (colDataIsNull(pColInfoData, rows, j, NULL) || !var) { len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); if (len >= size -1) return dumpBuf; continue; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5f13c83eda..3c674e8d10 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -481,7 +481,7 @@ typedef struct SStreamFinalIntervalOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter - + SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; // multiple results build supporter SInterval interval; // interval info int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. @@ -629,6 +629,7 @@ typedef struct SStateWindowInfo { typedef struct SStreamSessionAggOperatorInfo { SOptrBasicInfo binfo; SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; int64_t gap; // session window gap int32_t primaryTsIndex; // primary timestamp slot id @@ -679,11 +680,12 @@ typedef struct SStateWindowOperatorInfo { typedef struct SStreamStateAggOperatorInfo { SOptrBasicInfo binfo; SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; int32_t primaryTsIndex; // primary timestamp slot id int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; - SColumn stateCol; // start row index + SColumn stateCol; SqlFunctionCtx* pDummyCtx; // for combine SSDataBlock* pDelRes; SHashObj* pSeDeleted; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ed8ea56329..78775073a4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2401,6 +2401,12 @@ void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) { static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } +STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { + STimeWindow w = {.skey = ts, .ekey = INT64_MAX}; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + return w; +} + static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, SArray* pUpdated) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; @@ -2420,8 +2426,12 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = - getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); + STimeWindow nextWin = {0}; + if (IS_FINAL_OP(pInfo)) { + nextWin = getFinalTimeWindow(ts, &pInfo->interval); + } else { + nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); + } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if (pInfo->ignoreExpiredData && isClosed) { @@ -2478,8 +2488,12 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + if (IS_FINAL_OP(pInfo)) { + forwardRows = 1; + } else { + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { saveResultRow(pResult, tableGroupId, pUpdated); } @@ -2645,7 +2659,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearSpecialDataBlock(pInfo->pUpdateRes); removeDeleteResults(pUpdated, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; - qInfo("Stream Final Interval return data"); + qInfo("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); break; } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); @@ -2705,6 +2719,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); if (IS_FINAL_OP(pInfo)) { @@ -2822,6 +2840,15 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); + if (pIntervalPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -2988,6 +3015,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } initResultSizeInfo(pOperator, 4096); + if (pSessionNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } SExprSupp* pSup = &pOperator->exprSupp; code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); @@ -3656,6 +3691,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { continue; } + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); @@ -3788,6 +3827,10 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { continue; } + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, false); @@ -4184,6 +4227,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { continue; } + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); @@ -4237,6 +4284,15 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->stateCol = extractColumnFromColumnNode(pColNode); initResultSizeInfo(pOperator, 4096); + if (pStateNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pStateNode->window.watermark, -- GitLab