diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index a26f1c74f8b7c376adf189dbb4416f6e6db9a00d..baf7d447cc6f0bbb3e8b552acdefd38abafc98cc 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -88,6 +88,7 @@ struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); +void resetResultRow(SResultRow* pResultRow, size_t entrySize); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6516d768305a1aa9164b4be3386137a54a8d83a1..18f6d3ad2cc803a89698007138f908b98f9b388f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -585,11 +585,12 @@ typedef struct SIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo* intervalAggOperatorInfo; - bool hasGroupId; +// bool hasGroupId; uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; SNode* pCondition; + SResultRow* pResultRow; } SMergeAlignedIntervalAggOperatorInfo; typedef struct SStreamIntervalOperatorInfo { @@ -649,7 +650,6 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; SNode* pFilterNode; // filter info, which is push down by optimizer @@ -691,7 +691,6 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -738,7 +737,6 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -827,7 +825,6 @@ typedef struct SStateWindowOperatorInfo { SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - // bool reptScan; const SNode* pCondition; } SStateWindowOperatorInfo; @@ -848,24 +845,6 @@ typedef struct SStreamStateAggOperatorInfo { bool ignoreExpiredData; } SStreamStateAggOperatorInfo; -typedef struct SSortedMergeOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - - SArray* pSortInfo; - int32_t numOfSources; - SSortHandle* pSortHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t resultRowFactor; - bool hasGroupVal; - SDiskbasedBuf* pTupleStore; // keep the final results - int32_t numOfResPerPage; - char** groupVal; - SArray* groupInfo; -} SSortedMergeOperatorInfo; - typedef struct SSortOperatorInfo { SOptrBasicInfo binfo; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -873,11 +852,10 @@ typedef struct SSortOperatorInfo { SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output int32_t bufPageSize; - - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SLimitInfo limitInfo; - SNode* pCondition; + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -909,7 +887,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_decode_fn_t decode, __optr_explain_fn_t explain); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); @@ -944,7 +921,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int SSDataBlock* pBlock, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); @@ -1091,10 +1067,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEn void printDataBlock(SSDataBlock* pBlock, const char* flag); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); -int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, - const int32_t* rowCellOffset, SSDataBlock* pBlock, - SExecTaskInfo* pTaskInfo); +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0c1728386b4d88d0bebe66e6705c328803985eb0..d75f0580e17d7754a28295c0db790e51bcfc3f8d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -33,6 +33,17 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) { void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } +void resetResultRow(SResultRow* pResultRow, size_t entrySize) { + pResultRow->numOfRows = 0; + pResultRow->closed = false; + pResultRow->endInterp = false; + pResultRow->startInterp = false; + + if (entrySize > 0) { + memset(pResultRow->pEntryInfo, 0, entrySize); + } +} + // TODO refactor: use macro SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) { assert(index >= 0 && offset != NULL); @@ -799,9 +810,15 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, taosMemoryFreeClear(pColInfoData); } - for (int i = 0; i < taosArrayGetSize(res); i++) { + size_t numOfTables = taosArrayGetSize(res); + for (int i = 0; i < numOfTables; i++) { STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; - taosArrayPush(pListInfo->pTableList, &info); + void* p = taosArrayPush(pListInfo->pTableList, &info); + if (p == NULL) { + taosArrayDestroy(res); + return TSDB_CODE_OUT_OF_MEMORY; + } + qDebug("tagfilter get uid:%ld", info.uid); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 17954178b137cf3074236c3b37be0d2da32efa82..e0d417093c07b718cbe66262e732aeb2ec6623bd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -132,8 +132,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, return fpSet; } -void operatorDummyCloseFn(void* param, int32_t numOfCols) {} - static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); @@ -1269,33 +1267,12 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu } } -// todo extract method with copytoSSDataBlock -int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, - const int32_t* rowCellOffset, SSDataBlock* pBlock, - SExecTaskInfo* pTaskInfo) { - SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); - SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); - - doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset); - if (pRow->numOfRows == 0) { - releaseBufPage(pBuf, page); - return 0; - } - - while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25); - if (TAOS_FAILED(code)) { - releaseBufPage(pBuf, page); - qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } - } - +static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset); + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { @@ -1303,7 +1280,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi T_LONG_JMP(pTaskInfo->env, code); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor + // do nothing } else { // expand the result into multiple rows. E.g., _wstart, top(k, 20) // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. @@ -1314,10 +1291,40 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi } } } +} + +// todo refactor. SResultRow has direct pointer in miainfo +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { + SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); + + SqlFunctionCtx* pCtx = pSup->pCtx; + SExprInfo* pExprInfo = pSup->pExprInfo; + const int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + + doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset); + if (pRow->numOfRows == 0) { + releaseBufPage(pBuf, page); + return 0; + } + + int32_t size = pBlock->info.capacity; + while (pBlock->info.rows + pRow->numOfRows > size) { + size = size * 1.25; + } + + int32_t code = blockDataEnsureCapacity(pBlock, size); + if (TAOS_FAILED(code)) { + releaseBufPage(pBuf, page); + qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, code); + } + + doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; - return 0; } @@ -1362,32 +1369,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } pGroupResInfo->index += 1; - - for (int32_t j = 0; j < numOfExprs; ++j) { - int32_t slotId = pExprInfo[j].base.resSchema.slotId; - - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - if (pCtx[j].fpSet.finalize) { -#ifdef BUF_PAGE_DEBUG - qDebug("\npage_finalize %d", numOfExprs); -#endif - int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } - } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor - } else { - // expand the result into multiple rows. E.g., _wstart, top(k, 20) - // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } - } - } + doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -1727,22 +1709,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t static void doDestroyTableList(STableListInfo* pTableqinfoList); -static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { -#if 0 - if (order == TSDB_ORDER_ASC) { - assert( - (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) && - (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey)); - } else { - assert( - (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) && - (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey)); - } -#endif -} - typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; int32_t sourceIndex; @@ -2307,21 +2273,6 @@ _error: static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); -static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { - SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param; - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->groupInfo); - - if (pInfo->pSortHandle != NULL) { - tsortDestroySortHandle(pInfo->pSortHandle); - } - - blockDataDestroy(pInfo->binfo.pRes); - cleanupAggSup(&pInfo->aggSup); - - taosMemoryFreeClear(param); -} - static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { size_t size = taosArrayGetSize(groupInfo); if (size == 0) { @@ -2357,41 +2308,6 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 return 0; } -static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, - int32_t rowIndex) { - for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index - // pCtx[j].startRow = rowIndex; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - // pCtx[j].fpSet->addInput(&pCtx[j]); - - // if (functionId < 0) { - // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - // } else { - // assert(!TSDB_FUNC_IS_SCALAR(functionId)); - // aAggs[functionId].mergeFunc(&pCtx[j]); - // } - } -} - -static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) { - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - // if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) { - // continue; - // } - - // if (functionId < 0) { - // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - // } else { - // pCtx[j].fpSet.finalize(&pCtx[j]); - } -} - static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) { int32_t size = (int32_t)taosArrayGetSize(pColumnList); @@ -2406,210 +2322,6 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock return true; } -static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { - SSortedMergeOperatorInfo* pInfo = pOperator->info; - - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - if (!pInfo->hasGroupVal) { - ASSERT(i == 0); - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); - } else { - if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - } else { - doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows); - - // TODO check for available buffer; - - // next group info data - pInfo->binfo.pRes->info.rows += numOfRows; - for (int32_t j = 0; j < numOfExpr; ++j) { - if (pCtx[j].functionId < 0) { - continue; - } - - pCtx[j].fpSet.process(&pCtx[j]); - } - - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); - } - } - } -} - -static SSDataBlock* doMerge(SOperatorInfo* pOperator) { - SSortedMergeOperatorInfo* pInfo = pOperator->info; - SSortHandle* pHandle = pInfo->pSortHandle; - - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); - blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity); - - while (1) { - blockDataCleanup(pDataBlock); - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - // build datablock for merge for one group - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) { - break; - } - } - - if (pDataBlock->info.rows == 0) { - break; - } - - setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, - // pOperator->pRuntimeEnv, true); - doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock); - // flush to tuple store, and after all data have been handled, return to upstream node or sink node - } - - doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs); - int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows); - - // TODO check for available buffer; - - // next group info data - pInfo->binfo.pRes->info.rows += numOfRows; - return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL; -} - -SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) { - blockDataCleanup(pDataBlock); - - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { - return NULL; - } - - blockDataEnsureCapacity(p, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { - break; - } - } - - if (p->info.rows > 0) { - int32_t numOfCols = taosArrayGetSize(pColMatchInfo); - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); - - SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); - colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); - } - - pDataBlock->info.rows = p->info.rows; - pDataBlock->info.capacity = p->info.rows; - } - - blockDataDestroy(p); - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortedMergeOperatorInfo* pInfo = pOperator->info; - if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo); - } - - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pOperator->pDownstream[i]; - tsortAddSource(pInfo->pSortHandle, ps); - } - - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } - - pOperator->status = OP_RES_TO_RETURN; - return doMerge(pOperator); -} - -static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, - SSortedMergeOperatorInfo* pInfo) { - if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { - return 0; - } - - int32_t len = 0; - SArray* plist = taosArrayInit(3, sizeof(SColumn)); - pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t)); - - if (plist == NULL || pInfo->groupInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo); - for (int32_t i = 0; i < numOfGroupCol; ++i) { - SColumn* pCol = taosArrayGet(pGroupInfo, i); - for (int32_t j = 0; j < numOfCols; ++j) { - SExprInfo* pe = &pExprInfo[j]; - if (pe->base.resSchema.slotId == pCol->colId) { - taosArrayPush(plist, pCol); - taosArrayPush(pInfo->groupInfo, &j); - len += pCol->bytes; - break; - } - } - } - - ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist)); - - pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len)); - if (pInfo->groupVal == NULL) { - taosArrayDestroy(plist); - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t offset = 0; - char* start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol)); - for (int32_t i = 0; i < numOfGroupCol; ++i) { - pInfo->groupVal[i] = start + offset; - SColumn* pCol = taosArrayGet(plist, i); - offset += pCol->bytes; - } - - taosArrayDestroy(plist); - - return TSDB_CODE_SUCCESS; -} - int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; @@ -3342,13 +3054,6 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { - SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param; - cleanupBasicInfo(pInfo); - - taosMemoryFreeClear(param); -} - static void freeItem(void* pItem) { void** p = pItem; if (*p != NULL) { @@ -3855,7 +3560,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = terrno; + pTaskInfo->code = code; qError("failed to getTableList, code: %s", tstrerror(code)); return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa559f0692dd2408c8c88ea6fddeb405e2b51059..c221aaf4fdde6662fefc39d78c6d55d3570fa15a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -46,19 +46,6 @@ static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, co uint64_t groupId); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); -///* -// * There are two cases to handle: -// * -// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including -// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. -// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be -// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there -// * is a previous result generated or not. -// */ -// static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { -// // do nothing -//} - static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan, @@ -3011,9 +2998,9 @@ static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pIn SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; // add pull data request savePullWindow(&pull, pInfo->pPullWins); - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size); + int32_t size1 = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, winKey, size1); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); } } } @@ -4895,72 +4882,65 @@ _error: return NULL; } -void destroyMergeAlignedIntervalOperatorInfo(void* param) { +void destroyMAIOperatorInfo(void* param) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); } -static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, - SSDataBlock* pResultBlock, TSKEY wstartTs) { - SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; - - SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; - - SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( - iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(p1 != NULL); +static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) { + SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); + pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + return pResult; +} - finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, - pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); +static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, + SExprSupp* pExprSup, SAggSupporter* pAggSup) { + if (*pResult == NULL) { + *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup); + if (*pResult == NULL) { + return terrno; + } + } + // set time window for current result + (*pResult)->win = (*win); + setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, - SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) { + SSDataBlock* pBlock, SSDataBlock* pResultBlock) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExprSupp* pSup = &pOperatorInfo->exprSupp; + SInterval* pInterval = &iaInfo->interval; - int32_t startPos = 0; - int32_t numOfOutput = pSup->numOfExprs; - int64_t* tsCols = extractTsCol(pBlock, iaInfo); - uint64_t tableGroupId = pBlock->info.groupId; - SResultRow* pResult = NULL; + int32_t startPos = 0; + int64_t* tsCols = extractTsCol(pBlock, iaInfo); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); // there is an result exists if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); - if (ts != miaInfo->curTs) { - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); + finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); + resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); miaInfo->curTs = ts; } } else { miaInfo->curTs = ts; - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); } STimeWindow win = {0}; win.skey = miaInfo->curTs; - win.ekey = - taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; + win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - // TODO: remove the hash table (groupid + winkey => result row position) - int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { + T_LONG_JMP(pTaskInfo->env, ret); } int32_t currPos = startPos; @@ -4973,21 +4953,19 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, numOfOutput); + pBlock->info.rows, pSup->numOfExprs); - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); + finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); + resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); miaInfo->curTs = tsCols[currPos]; currWin.skey = miaInfo->curTs; - currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - - 1; + currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; startPos = currPos; - ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { + T_LONG_JMP(pTaskInfo->env, ret); } miaInfo->curTs = currWin.skey; @@ -4995,68 +4973,79 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, numOfOutput); + pBlock->info.rows, pSup->numOfExprs); +} + +static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { + pRes->info.groupId = pMiaInfo->groupId; + pMiaInfo->curTs = INT64_MIN; + pMiaInfo->groupId = 0; } static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; - SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pRes = iaInfo->binfo.pRes; + SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info; + SIntervalAggOperatorInfo* pIaInfo = pMiaInfo->intervalAggOperatorInfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - int32_t scanFlag = MAIN_SCAN; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pIaInfo->binfo.pRes; + SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t scanFlag = MAIN_SCAN; while (1) { SSDataBlock* pBlock = NULL; - if (miaInfo->prefetchedBlock == NULL) { + if (pMiaInfo->prefetchedBlock == NULL) { pBlock = downstream->fpSet.getNextFn(downstream); } else { - pBlock = miaInfo->prefetchedBlock; - miaInfo->prefetchedBlock = NULL; + pBlock = pMiaInfo->prefetchedBlock; + pMiaInfo->prefetchedBlock = NULL; - miaInfo->groupId = pBlock->info.groupId; + pMiaInfo->groupId = pBlock->info.groupId; } + // no data exists, all query processing is done if (pBlock == NULL) { - // close last unfinalized time window - if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); - outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); - miaInfo->curTs = INT64_MIN; + // close last unclosed time window + if (pMiaInfo->curTs != INT64_MIN) { + finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); + resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); + cleanupAfterGroupResultGen(pMiaInfo, pRes); } doSetOperatorCompleted(pOperator); break; } - if (!miaInfo->hasGroupId) { - miaInfo->hasGroupId = true; - miaInfo->groupId = pBlock->info.groupId; - } else if (miaInfo->groupId != pBlock->info.groupId) { - // if there are unclosed time window, close it firstly. - ASSERT(miaInfo->curTs != INT64_MIN); - outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); - miaInfo->prefetchedBlock = pBlock; - miaInfo->curTs = INT64_MIN; - break; + if (pMiaInfo->groupId == 0) { + if (pMiaInfo->groupId != pBlock->info.groupId) { + pMiaInfo->groupId = pBlock->info.groupId; + } + } else { + if (pMiaInfo->groupId != pBlock->info.groupId) { + // if there are unclosed time window, close it firstly. + ASSERT(pMiaInfo->curTs != INT64_MIN); + finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); + resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); + + pMiaInfo->prefetchedBlock = pBlock; + cleanupAfterGroupResultGen(pMiaInfo, pRes); + break; + } else { + // continue + } } - getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); - setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); - doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true); + doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(miaInfo->pCondition, pRes, NULL); + doFilter(pMiaInfo->pCondition, pRes, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } } - - pRes->info.groupId = miaInfo->groupId; - miaInfo->hasGroupId = false; } static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { @@ -5155,7 +5144,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = miaInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, - destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); + destroyMAIOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5165,7 +5154,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, return pOperator; _error: - destroyMergeAlignedIntervalOperatorInfo(miaInfo); + destroyMAIOperatorInfo(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5208,8 +5197,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); - finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, - pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); +// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return TSDB_CODE_SUCCESS; } @@ -5218,9 +5206,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* newWin) { SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); - SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin}; tdListAppend(miaInfo->groupIntervals, &groupTimeWindow); @@ -5233,9 +5219,10 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t if (prevGrpWin->groupId != tableGroupId) { continue; } + STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { - finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); +// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListPopNode(miaInfo->groupIntervals, listNode); } } @@ -5395,7 +5382,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); - finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); +// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); pRes->info.groupId = grpWin->groupId; } }