diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 57f947692270c8c4512abb7ee9b01f20f3dfb86b..2832af5d10a1623335d88482dd42c3959105b3c6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -26,12 +26,12 @@ #include "tlog.h" #include "ttime.h" -#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) -#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" -#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" -#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" +#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" +#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" typedef struct SStateWindowInfo { SResultWindowInfo winInfo; @@ -456,13 +456,14 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { return pStore->streamStateCheck(pState, pKey); } +bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { + return pStore->streamStateCheck(pState, pKey); +} int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SStateStore* pStore) { - - SWinKey key = { .ts = win->skey, .groupId = groupId }; + SWinKey key = {.ts = win->skey, .groupId = groupId}; char* value = NULL; int32_t size = pAggSup->resultRowSize; @@ -479,7 +480,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu return TSDB_CODE_SUCCESS; } -bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore) { +bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, + SStateStore* pStore) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; if (!hasIntervalWindow(pState, &key, pStore)) { @@ -549,7 +551,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { +void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, + int32_t numOfCh, SOperatorInfo* pOperator) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); @@ -572,13 +575,13 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); - qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts); + qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); if (pFinalCh) { taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey)); doDeleteWindow(pOperator, winRes.ts, winRes.groupId); - STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); + STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); SPullWindowInfo pull = {.window = nextWin, .groupId = winRes.groupId, .calWin.skey = nextWin.skey, @@ -617,7 +620,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i } else { SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); - qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index); + qDebug("===stream===check final retrive %" PRId64 ",chid:%d", winKey->ts, index); if (index == -1) { qDebug("===stream===add final retrive %" PRId64, winKey->ts); taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0); @@ -638,8 +641,8 @@ int32_t getOutputBuf(void* pState, SRowBuffPos* pPos, SResultRow** pResult, SSta int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; @@ -775,7 +778,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || + !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break; @@ -790,7 +794,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat .groupId = groupId, }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed && !chIds) { + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed && + !chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request @@ -910,19 +915,18 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { } static char* getStreamOpName(uint16_t opType) { switch (opType) { - case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: - return "interval single"; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: - return "interval final"; - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: - return "interval semi"; - default: - return ""; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: + return "interval single"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: + return "interval final"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: + return "interval semi"; + default: + return ""; } - } -static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){ +static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; uint16_t opType = pOperator->operatorType; @@ -953,10 +957,10 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - SExprSupp* pSup = &pOperator->exprSupp; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SExprSupp* pSup = &pOperator->exprSupp; qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); @@ -1032,7 +1036,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } pInfo->numOfDatapack++; - printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo)); + printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", + GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -1056,7 +1061,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", + GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_CLEAR) { pInfo->pDelRes->info.type = STREAM_CLEAR; } else { @@ -1077,7 +1083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_INTERVAL_OP(pOperator)) { - processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator); + processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, + pInfo->numOfChild, pOperator); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -1151,8 +1158,8 @@ static int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) { static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { int32_t size = 0; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); - size = TMAX(size, resSize); + int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); + size = TMAX(size, resSize); } return size; } @@ -1160,11 +1167,12 @@ static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { static void streamIntervalReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t resSize = sizeof(TSKEY); - pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); + int32_t resSize = sizeof(TSKEY); + pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); } SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; pAPI->stateStore.streamStateCommit(pInfo->pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { @@ -1175,11 +1183,11 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) { void streamIntervalReloadState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t size = 0; - void* pBuf = NULL; + int32_t size = 0; + void* pBuf = NULL; int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); - TSKEY ts = *(TSKEY*)pBuf; + TSKEY ts = *(TSKEY*)pBuf; taosMemoryFree(pBuf); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts); @@ -1227,7 +1235,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1282,9 +1290,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); - pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, - compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); + int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); + pInfo->pState->pFileState = + pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); pInfo->dataVersion = 0; pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -1392,7 +1401,8 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, - SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) { + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, + SReadHandle* pHandle, SStorageAPI* pApi) { pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -1495,7 +1505,8 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t size = 0; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); + int32_t code = + pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1608,7 +1619,8 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* setSessionWinOutputInfo(pStUpdated, pNextWin); int32_t size = 0; pNextWin->sessionWin = pCurWin->sessionWin; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); + int32_t code = + pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pNextWin->pOutputBuf); SET_SESSION_WIN_INVALID(*pNextWin); @@ -1617,7 +1629,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* } static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, - SSHashObj* pStDeleted, bool addGap) { + SSHashObj* pStDeleted, bool addGap) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -1661,7 +1673,8 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { - saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); + saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, + &pAggSup->stateStore); pWinInfo->pOutputBuf = NULL; return TSDB_CODE_SUCCESS; } @@ -1693,7 +1706,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); } - TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; + TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; for (int32_t i = 0; i < rows;) { if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) { i++; @@ -1837,9 +1850,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo } static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; int32_t size = taosArrayGetSize(pWinArray); SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -1878,7 +1891,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS } } num++; - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, pChild->exprSupp.rowEntryInfoOffset); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); @@ -1953,8 +1966,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->pBuf = NULL; } -void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, - SSDataBlock* pBlock) { +void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // set output datablock version pBlock->info.version = pTaskInfo->version; @@ -2005,12 +2017,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", + GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", + GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -2031,7 +2045,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo)); + printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", + GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -2065,7 +2080,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator), true); + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator), + true); if (IS_FINAL_SESSION_OP(pOperator)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2094,7 +2110,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; - if(pInfo->isHistoryOp) { + if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); @@ -2109,13 +2125,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", + GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", + GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -2126,8 +2144,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { void streamSessionReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { @@ -2142,16 +2162,16 @@ void resetWinRange(STimeWindow* winRange) { void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); SResultWindowInfo winInfo = {0}; - int32_t size = 0; - void* pBuf = NULL; - int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); - SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey)); if (!pInfo->pStUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2162,7 +2182,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { - qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId); + qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, + winInfo.sessionWin.groupId); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(winInfo, pInfo->pStUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -2216,7 +2237,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, - pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); + pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, + &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2399,7 +2421,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, + SReadHandle* pHandle) { int32_t code = TSDB_CODE_OUT_OF_MEMORY; SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle); if (pOperator == NULL) { @@ -2503,9 +2526,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pCurWin->winInfo.sessionWin.groupId = groupId; pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts; - int32_t code = - pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize, - compareStateKey, &pCurWin->winInfo.pOutputBuf, &size); + int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, + pKeyData, pAggSup->stateKeySize, compareStateKey, + &pCurWin->winInfo.pOutputBuf, &size); pCurWin->pStateKey = (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); @@ -2515,10 +2538,11 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, + &pAggSup->pSessionAPI->stateStore); pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); pCurWin->pStateKey = - (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pCurWin->pStateKey->type = pAggSup->stateKeyType; pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); @@ -2526,7 +2550,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pCurWin->winInfo.sessionWin.groupId = groupId; pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts; - qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey); + qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, + pCurWin->winInfo.sessionWin.win.ekey); } if (code == TSDB_CODE_SUCCESS) { @@ -2541,14 +2566,16 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + SStreamStateCur* pCur = + pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); int32_t nextSize = pAggSup->resultRowSize; - code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize); + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, + &pNextWin->winInfo.pOutputBuf, &nextSize); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); } else { pNextWin->pStateKey = - (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pNextWin->pStateKey->type = pAggSup->stateKeyType; pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); @@ -2619,7 +2646,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl return; } - int32_t rows = pSDataBlock->info.rows; + int32_t rows = pSDataBlock->info.rows; blockDataEnsureCapacity(pAggSup->pScanBlock, rows); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < rows; i += winRows) { @@ -2747,7 +2774,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pSeUpdated); pInfo->pSeUpdated = NULL; - if(pInfo->isHistoryOp) { + if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -2778,9 +2805,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { void streamStateReleaseState(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, + strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -2789,14 +2818,14 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - SResultRow* pCurResult = NULL; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); SResultRow* pWinResult = NULL; initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -2806,7 +2835,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); if (pNextWin->isOutput && pStDeleted) { - qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId); + qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, + pNextWin->sessionWin.groupId); saveDeleteRes(pStDeleted, pNextWin->sessionWin); } removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); @@ -2816,17 +2846,17 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur void streamStateReloadState(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; - int32_t size = 0; - void* pBuf = NULL; - int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, - strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, + strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); qDebug("===stream=== reload state. get result count:%d", num); - SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey)); if (!pInfo->pSeUpdated && num > 0) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2840,13 +2870,16 @@ void streamStateReloadState(SOperatorInfo* pOperator) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; SStateWindowInfo dummy = {0}; - qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); + qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, + pSeKeyBuf[i].groupId, i); setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); - bool cpRes = compareWinStateKey(curInfo.pStateKey,nextInfo.pStateKey); - qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes); + bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey); + qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", + nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes); if (cpRes) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeDeleted); - qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId); + qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, + curInfo.winInfo.sessionWin.groupId); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(curInfo.winInfo, pInfo->pSeUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -2858,7 +2891,8 @@ void streamStateReloadState(SOperatorInfo* pOperator) { tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); } } else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, + &pAggSup->pSessionAPI->stateStore); } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { @@ -2870,7 +2904,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); - } + } } SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -2980,7 +3014,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SExprSupp* pSup = &pOperator->exprSupp; @@ -3164,8 +3198,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pInfo->pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, + &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3197,7 +3231,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - int32_t funResSize= getMaxFunResSize(pSup, numOfCols); + int32_t funResSize = getMaxFunResSize(pSup, numOfCols); pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,