From d8da10f2bb697dddb79bcdf9958521d1e8e5114e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Nov 2022 00:51:18 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executil.h | 30 ++--- source/libs/executor/inc/executorimpl.h | 61 ++++------- source/libs/executor/src/executor.c | 17 +-- source/libs/executor/src/executorimpl.c | 103 +----------------- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 35 ++---- 7 files changed, 56 insertions(+), 196 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index d5366f1b7a..875528576d 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -38,16 +38,7 @@ memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) -#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \ - do { \ - assert(sizeof(_uid) == sizeof(uint64_t)); \ - *(void**)(_k) = (_buf); \ - *(uint64_t*)((_k) + POINTER_BYTES) = (_uid); \ - memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \ - } while (0) - #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) -#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) @@ -104,16 +95,17 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo); STableListInfo* tableListCreate(); -void* tableListDestroy(STableListInfo* pTableListInfo); -void tableListClear(STableListInfo* pTableListInfo); -int32_t tableListGetOutputGroups(const STableListInfo* pTableList); -bool oneTableForEachGroup(const STableListInfo* pTableList); -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); -int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); -int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t tableListGetSize(const STableListInfo* pTableList); -uint64_t tableListGetSuid(const STableListInfo* pTableList); -STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); +void* tableListDestroy(STableListInfo* pTableListInfo); +void tableListClear(STableListInfo* pTableListInfo); +int32_t tableListGetOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, + int32_t* num); +uint64_t tableListGetSize(const STableListInfo* pTableList); +uint64_t tableListGetSuid(const STableListInfo* pTableList); +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bd4472327c..8163217039 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -665,18 +665,25 @@ typedef struct SStreamFillOperatorInfo { SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain); - -int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +void setOperatorCompleted(SOperatorInfo* pOperator); +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, + void* pInfo, SExecTaskInfo* pTaskInfo); +void destroyOperatorInfo(SOperatorInfo* pOperator); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); + int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); + void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); -int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey); +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey); +void cleanupAggSup(SAggSupporter* pAggSup); + void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, @@ -702,14 +709,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul extern void doDestroyExchangeOperatorInfo(void* param); -void setOperatorCompleted(SOperatorInfo* pOperator); -void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, - void* pInfo, SExecTaskInfo* pTaskInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); -void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); @@ -724,6 +727,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); @@ -779,6 +784,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); // clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -786,38 +793,22 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); -bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); +bool isTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo); -void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); -void destroyOperatorInfo(SOperatorInfo* pOperator); -int32_t getMaximumIdleDurationSec(); - -/* - * ops: root operator - * data: *data save the result of encode, need to be freed by caller - * length: *length save the length of *data - * nOptrWithVal: *nOptrWithVal save the number of optr with value - * return: result code, 0 means success - */ -int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal); - -/* - * ops: root operator, created by caller - * data: save the result of decode - * length: the length of data - * return: result code, 0 means success - */ -int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); + int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); +void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo); + +int32_t getMaximumIdleDurationSec(); + STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, @@ -840,15 +831,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo); - -void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); - bool groupbyTbname(SNodeList* pGroupList); -void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9b3bd1d808..34bd9cf8ca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -712,7 +712,7 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); - queryCostStatis(pTaskInfo); // print the query cost summary + printTaskExecCostInLog(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } @@ -728,12 +728,12 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { } int32_t nOptrWithVal = 0; - int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); - if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { - taosMemoryFreeClear(*pOutput); - *len = 0; - } - return code; +// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); +// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { +// taosMemoryFreeClear(*pOutput); +// *len = 0; +// } + return 0; } int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { @@ -743,7 +743,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le return TSDB_CODE_INVALID_PARA; } - return decodeOperator(pTaskInfo->pRoot, pInput, len); + return 0; +// return decodeOperator(pTaskInfo->pRoot, pInput, len); } int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5eaa8ba8dd..5abde1be85 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1335,7 +1335,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -void queryCostStatis(SExecTaskInfo* pTaskInfo) { +void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; @@ -1958,7 +1958,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { destroyDiskbasedBuf(pAggSup->pResultBuf); } -int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey) { int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { @@ -2056,7 +2056,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2734,103 +2734,6 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa } #endif -int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) { - int32_t code = TDB_CODE_SUCCESS; - char* pCurrent = NULL; - int32_t currLength = 0; - if (ops->fpSet.encodeResultRow) { - if (result == NULL || length == NULL || nOptrWithVal == NULL) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength); - - if (code != TDB_CODE_SUCCESS) { - if (*result != NULL) { - taosMemoryFree(*result); - *result = NULL; - } - return code; - } else if (currLength == 0) { - ASSERT(!pCurrent); - goto _downstream; - } - - ++(*nOptrWithVal); - - ASSERT(currLength >= 0); - - if (*result == NULL) { - *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t)); - if (*result == NULL) { - taosMemoryFree(pCurrent); - return TSDB_CODE_OUT_OF_MEMORY; - } - memcpy(*result + sizeof(int32_t), pCurrent, currLength); - *(int32_t*)(*result) = currLength + sizeof(int32_t); - } else { - int32_t sizePre = *(int32_t*)(*result); - char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength); - if (tmp == NULL) { - taosMemoryFree(pCurrent); - taosMemoryFree(*result); - *result = NULL; - return TSDB_CODE_OUT_OF_MEMORY; - } - *result = tmp; - memcpy(*result + sizePre, pCurrent, currLength); - *(int32_t*)(*result) += currLength; - } - taosMemoryFree(pCurrent); - *length = *(int32_t*)(*result); - } - -_downstream: - for (int32_t i = 0; i < ops->numOfDownstream; ++i) { - code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); - if (code != TDB_CODE_SUCCESS) { - return code; - } - } - return TDB_CODE_SUCCESS; -} - -int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) { - int32_t code = TDB_CODE_SUCCESS; - if (ops->fpSet.decodeResultRow) { - if (result == NULL) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - ASSERT(length == *(int32_t*)result); - - const char* data = result + sizeof(int32_t); - code = ops->fpSet.decodeResultRow(ops, (char*)data); - if (code != TDB_CODE_SUCCESS) { - return code; - } - - int32_t totalLength = *(int32_t*)result; - int32_t dataLength = *(int32_t*)data; - - if (totalLength == dataLength + sizeof(int32_t)) { // the last data - result = NULL; - length = 0; - } else { - result += dataLength; - *(int32_t*)(result) = totalLength - dataLength; - length = totalLength - dataLength; - } - } - - for (int32_t i = 0; i < ops->numOfDownstream; ++i) { - code = decodeOperator(ops->pDownstream[i], result, length); - if (code != TDB_CODE_SUCCESS) { - return code; - } - } - return TDB_CODE_SUCCESS; -} - int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index bbf9bd2a27..6dc8818900 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -456,7 +456,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 819997c521..4bba3a72e1 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -102,7 +102,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys } initResultSizeInfo(&pOperator->resultInfo, numOfRows); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -400,7 +400,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy initResultSizeInfo(&pOperator->resultInfo, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows); - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 80c3c1c454..0e0ec5b339 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1741,7 +1741,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2001,7 +2001,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2069,7 +2069,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2243,26 +2243,6 @@ static void clearSpecialDataBlock(SSDataBlock* pBlock) { blockDataCleanup(pBlock); } -void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { - // ASSERT(pDest->info.capacity >= pSource->info.rows); - blockDataEnsureCapacity(pDest, pSource->info.rows); - clearSpecialDataBlock(pDest); - SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); - SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); - - // copy timestamp column - colDataAssign(pDestCol, pSourceCol, pSource->info.rows, &pDest->info); - for (int32_t i = 1; i < taosArrayGetSize(pDest->pDataBlock); i++) { - SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i); - colDataAppendNNULL(pCol, 0, pSource->info.rows); - } - - pDest->info.rows = pSource->info.rows; - pDest->info.groupId = pSource->info.groupId; - pDest->info.type = pSource->info.type; - blockDataUpdateTsWindow(pDest, 0); -} - static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) { clearSpecialDataBlock(pBlock); int32_t size = taosArrayGetSize(array); @@ -2707,7 +2687,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4346,7 +4326,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); - code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4651,7 +4631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4847,7 +4827,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys initResultSizeInfo(&pOperator->resultInfo, 4096); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4901,3 +4881,4 @@ _error: pTaskInfo->code = code; return NULL; } + -- GitLab