From c1659805b623ea742d4dcf3b2fd0ee11ae725642 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 May 2022 18:54:19 +0800 Subject: [PATCH] fix(query): do perform arithmetic operator before apply the sort procedure. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 20 +- source/libs/executor/inc/tsort.h | 2 +- source/libs/executor/src/executorimpl.c | 198 ++++-------------- source/libs/executor/src/groupoperator.c | 28 +-- source/libs/executor/src/scanoperator.c | 12 +- source/libs/executor/src/timewindowoperator.c | 46 ++-- source/libs/executor/src/tsort.c | 20 +- 8 files changed, 110 insertions(+), 218 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index dc27087b5c..d945ae6a33 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3279,7 +3279,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; - pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); +// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /* diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0404b4d447..bc6139c304 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -277,7 +277,7 @@ typedef struct SOperatorInfo { uint8_t operatorType; bool blocking; // block operator or not uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results + int32_t numOfExprs; // number of columns of the current operator results char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; @@ -415,8 +415,6 @@ typedef struct SOptrBasicInfo { // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result -// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not -// SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row @@ -577,13 +575,13 @@ typedef struct SSortedMergeOperatorInfo { } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { + SOptrBasicInfo binfo; uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock* pDataBlock; SArray* pSortInfo; SSortHandle* pSortHandle; SArray* inputSlotMap; // for index map from table scan output int32_t bufPageSize; - int32_t numOfRowsInRes; +// int32_t numOfRowsInRes; // TODO extact struct int64_t startTs; // sort start time @@ -645,10 +643,13 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); + +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, - int32_t* rowCellInfoOffset); +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, @@ -663,7 +664,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index c584df05dd..2072707b30 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -89,7 +89,7 @@ int32_t tsortClose(SSortHandle* pHandle); * * @return */ -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp); +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param); /** * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a91d204efa..a237eb0e7d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -202,9 +202,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); - if (!pDescNode->output) { - continue; - } +// if (!pDescNode->output) { // todo disable it temporarily +// continue; +// } idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; @@ -651,7 +651,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); @@ -713,7 +713,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; @@ -798,7 +798,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; // this can be set during create the struct pCtx[k].fpSet.process(&pCtx[k]); @@ -2815,7 +2815,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf // NOTE: sources columns are more than the destination SSDatablock columns. void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) { size_t numOfSrcCols = taosArrayGetSize(pCols); - ASSERT(numOfSrcCols >= pBlock->info.numOfCols); int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { @@ -3287,7 +3286,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pBlock->info.numOfCols; + pOperator->numOfExprs = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, @@ -3345,49 +3344,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } -// TODO merge aggregate super table -static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - bool isNull = tsortIsNullVal(pTupleHandle, i); - if (isNull) { - colDataAppend(pColInfo, pBlock->info.rows, NULL, true); - } else { - char* pData = tsortGetValue(pTupleHandle, i); - colDataAppend(pColInfo, pBlock->info.rows, pData, false); - } - } - - pBlock->info.rows += 1; -} - -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { - blockDataCleanup(pDataBlock); - blockDataEnsureCapacity(pDataBlock, capacity); - - blockDataEnsureCapacity(pDataBlock, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= capacity) { - return pDataBlock; - } - } - - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -SSDataBlock* loadNextDataBlock(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - return pOperator->fpSet.getNextFn(pOperator); -} - static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { size_t size = taosArrayGetSize(groupInfo); if (size == 0) { @@ -3490,8 +3446,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock doMergeResultImpl(pInfo, pCtx, numOfExpr, i); } else { doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3541,13 +3497,13 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true); // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // pOperator->pRuntimeEnv, true); - doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); + doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock); // flush to tuple store, and after all data have been handled, return to upstream node or sink node } - doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3571,7 +3527,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); @@ -3678,7 +3634,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; @@ -3703,79 +3659,6 @@ _error: return NULL; } -static SSDataBlock* doSort(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - - if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); - } - - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, - pInfo->bufPageSize, numOfBufPage, pInfo->pDataBlock, pTaskInfo->id.str); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); - - int32_t code = tsortOpen(pInfo->pSortHandle); - taosMemoryFreeClear(ps); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); - } - - pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); -} - -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t rowSize = pResBlock->info.rowSize; - - if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header - - pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; - pInfo->inputSlotMap = pIndexMap; - - pOperator->name = "SortOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - - pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); - - int32_t code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - return NULL; -} - int32_t getTableScanOrder(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { @@ -3813,7 +3696,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { break; } // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -3827,7 +3710,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); + setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true); doAggregateImpl(pOperator, 0, pInfo->pCtx); @@ -3848,7 +3731,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); @@ -4092,17 +3975,17 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // todo dynamic set tags // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs); if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) { - copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); - resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); + copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); + resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs); return pRes; } } @@ -4127,14 +4010,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->existDataBlock = pBlock; break; } else { // init output buffer for a new group data - initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); + initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs); } } // todo dynamic set tags // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again @@ -4143,7 +4026,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); int32_t status = handleLimitOffset(pOperator, pBlock); @@ -4156,7 +4039,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->curOutput += pInfo->pRes->info.rows; - // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); + // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } @@ -4289,7 +4172,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput); + pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs); } if (pOperator->pDownstream != NULL) { @@ -4425,7 +4308,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, @@ -4477,14 +4360,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; - pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->inputSlotMap); -} - void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -4538,7 +4413,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -4621,7 +4496,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = @@ -4979,7 +4854,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + if (pSortPhyNode->pExprs != NULL) { + pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + } + + pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -5566,7 +5448,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); SExprInfo* pExprInfo = &pOperator->pExpr[i]; @@ -5633,7 +5515,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f5dd67217e..58efd75a0b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -227,16 +227,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -244,15 +244,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = - setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); } } @@ -291,19 +291,19 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashGroupbyAgg(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, + // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // pInfo->binfo.rowCellInfoOffset); // } @@ -357,7 +357,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -392,7 +392,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t* rows = (int32_t*) pPage; - size_t numOfCols = pOperator->numOfOutput; + size_t numOfCols = pOperator->numOfExprs; for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = &pOperator->pExpr[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; @@ -565,7 +565,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashPartition(pOperator, pBlock); } @@ -616,7 +616,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7e721455c3..8c9fdfe4e6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -386,7 +386,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -646,7 +646,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; pOperator->fpSet.closeFn = operatorDummyCloseFn; @@ -1020,7 +1020,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); + pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); @@ -1150,7 +1150,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; @@ -1247,7 +1247,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char *data = NULL, *dst = NULL; int16_t type = 0, bytes = 0; - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + for(int32_t j = 0; j < pOperator->numOfExprs; ++j) { // not assign value in case of user defined constant output column if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) { continue; @@ -1308,7 +1308,7 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 580614dc02..224f3db912 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -325,7 +325,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SqlFunctionCtx* pCtx = pInfo->pCtx; - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { int32_t functionId = pCtx[k].functionId; if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { pCtx[k].start.key = INT64_MIN; @@ -406,12 +406,12 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF // start exactly from this point, no need to do interpolation TSKEY key = ascQuery ? win->skey : win->ekey; if (key == curTs) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } @@ -427,7 +427,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { int32_t order = TSDB_ORDER_ASC; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY key = order ? win->ekey : win->skey; @@ -572,7 +572,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); } // point interpolation does not require the end key time window interpolation. @@ -592,7 +592,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP); } } @@ -612,7 +612,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; SArray* pUpdated = NULL; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { @@ -683,7 +683,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -773,7 +773,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; @@ -798,7 +798,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -813,7 +813,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int64_t gid = pBlock->info.groupId; bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int16_t bytes = pStateColInfoData->info.bytes; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); @@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1013,13 +1013,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } - finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -1082,7 +1082,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, @@ -1141,7 +1141,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, @@ -1169,7 +1169,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; @@ -1270,7 +1270,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1309,7 +1309,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); @@ -1319,7 +1319,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs); // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); @@ -1346,7 +1346,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -1388,7 +1388,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; @@ -1440,7 +1440,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 46fc8da00c..50aa4cfc01 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -42,11 +42,7 @@ struct SSortHandle { _sort_fetch_block_fn_t fetchfp; _sort_merge_compar_fn_t comparFn; - - void *pParam; SMultiwayMergeTreeInfo *pMergeTree; - int32_t numOfCols; - int64_t startTs; uint64_t sortElapsed; uint64_t totalElapsed; @@ -61,6 +57,9 @@ struct SSortHandle { bool inMemSort; bool needAdjust; STupleHandle tupleHandle; + + void *param; + void (*beforeFp)(SSDataBlock* pBlock, void* param); }; static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); @@ -533,6 +532,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { pHandle->pDataBlock = createOneDataBlock(pBlock, false); } + // perform the scalar function calculation before apply the sort + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + + // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); if (code != 0) { return code; @@ -623,8 +629,10 @@ int32_t tsortClose(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) { - pHandle->fetchfp = fp; +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { + pHandle->fetchfp = fetchFp; + pHandle->beforeFp = fp; + pHandle->param = param; return TSDB_CODE_SUCCESS; } -- GitLab