From 3d12401119b73740401f985d134e8dcb2ff92774 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Mar 2022 10:34:29 +0800 Subject: [PATCH] [td-13039] refactor. --- source/libs/executor/inc/executorimpl.h | 30 +++--- source/libs/executor/src/executorMain.c | 2 +- source/libs/executor/src/executorimpl.c | 112 ++++++++++---------- source/libs/executor/test/executorTests.cpp | 12 +-- 4 files changed, 77 insertions(+), 79 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0ae54f7ad1..10ee4d157f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -234,8 +234,8 @@ typedef struct STaskAttr { } STaskAttr; typedef int32_t (*__optr_open_fn_t)(void* param); -typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); -typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); +typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup); +typedef void (*__optr_close_fn_t)(void* param, int32_t num); struct SOperatorInfo; @@ -306,21 +306,21 @@ enum { }; typedef struct SOperatorInfo { - uint8_t operatorType; - bool blockingOptr; // block operator or not - uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results - char* name; // name, used to show the query execution plan - void* info; // extension attribution - SExprInfo* pExpr; - STaskRuntimeEnv* pRuntimeEnv; // todo remove it - SExecTaskInfo* pTaskInfo; + uint8_t operatorType; + bool blockingOptr; // block operator or not + uint8_t status; // denote if current operator is completed + int32_t numOfOutput; // number of columns of the current operator results + char* name; // name, used to show the query execution plan + void* info; // extension attribution + SExprInfo* pExpr; + STaskRuntimeEnv* pRuntimeEnv; // todo remove it + SExecTaskInfo* pTaskInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_open_fn_t prepareFn; - __operator_fn_t exec; - __optr_cleanup_fn_t cleanupFn; + __optr_open_fn_t openFn; + __optr_fn_t nextDataFn; + __optr_close_fn_t closeFn; } SOperatorInfo; typedef struct { @@ -654,8 +654,6 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); -// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); // SSDataBlock* doSLimit(void* param, bool* newgroup); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 1642120d15..461235f2ab 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a8109e7363..18cfe86553 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5281,7 +5281,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = size; - pOperator->exec = doLoadRemoteData; + pOperator->nextDataFn = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; #if 1 @@ -5361,7 +5361,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->exec = doTableScan; + pOperator->nextDataFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5386,7 +5386,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doTableScanImpl; + pOperator->nextDataFn = doTableScanImpl; return pOperator; } @@ -5410,7 +5410,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->exec = doBlockInfoScan; + pOperator->nextDataFn = doBlockInfoScan; return pOperator; } @@ -5452,7 +5452,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->exec = doStreamBlockScan; + pOperator->nextDataFn = doStreamBlockScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -5663,7 +5663,7 @@ SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; bool newgroup = false; - return pOperator->exec(pOperator, &newgroup); + return pOperator->nextDataFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { @@ -5983,8 +5983,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->exec = doSortedMerge; - pOperator->cleanupFn = destroySortedMergeOperatorInfo; + pOperator->nextDataFn = doSortedMerge; + pOperator->closeFn = destroySortedMergeOperatorInfo; code = appendDownstream(pOperator, downstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -6079,8 +6079,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->exec = doSort; - pOperator->cleanupFn = destroyOrderOperatorInfo; + pOperator->nextDataFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6105,7 +6105,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6155,7 +6155,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6241,7 +6241,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6299,7 +6299,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6350,7 +6350,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock *pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6393,7 +6393,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6453,7 +6453,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6516,7 +6516,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6571,7 +6571,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6706,7 +6706,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6768,7 +6768,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6821,7 +6821,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->exec(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6906,7 +6906,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -6979,8 +6979,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } - if (pOperator->cleanupFn != NULL) { - pOperator->cleanupFn(pOperator->info, pOperator->numOfOutput); + if (pOperator->closeFn != NULL) { + pOperator->closeFn(pOperator->info, pOperator->numOfOutput); } if (pOperator->pDownstream != NULL) { @@ -7067,8 +7067,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->exec = doAggregate; - pOperator->cleanupFn = destroyAggOperatorInfo; + pOperator->nextDataFn = doAggregate; + pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7164,8 +7164,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; - pOperator->exec = doMultiTableAggregate; - pOperator->cleanupFn = destroyAggOperatorInfo; + pOperator->nextDataFn = doMultiTableAggregate; + pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7194,8 +7194,8 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doProjectOperation; - pOperator->cleanupFn = destroyProjectOperatorInfo; + pOperator->nextDataFn = doProjectOperation; + pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7250,10 +7250,10 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->status = OP_IN_EXECUTING; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = pExpr; - pOperator->exec = doFilter; + pOperator->nextDataFn = doFilter; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->cleanupFn = destroyConditionOperatorInfo; + pOperator->closeFn = destroyConditionOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7269,7 +7269,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn // pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; - pOperator->exec = doLimit; + pOperator->nextDataFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7311,8 +7311,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; - pOperator->exec = doIntervalAgg; - pOperator->cleanupFn = destroyBasicOperatorInfo; + pOperator->nextDataFn = doIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7336,8 +7336,8 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doAllIntervalAgg; - pOperator->cleanupFn = destroyBasicOperatorInfo; + pOperator->nextDataFn = doAllIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7360,8 +7360,8 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doStateWindowAgg; - pOperator->cleanupFn = destroyStateWindowOperatorInfo; + pOperator->nextDataFn = doStateWindowAgg; + pOperator->closeFn = destroyStateWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7385,8 +7385,8 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doSessionWindowAgg; - pOperator->cleanupFn = destroySWindowOperatorInfo; + pOperator->nextDataFn = doSessionWindowAgg; + pOperator->closeFn = destroySWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7409,8 +7409,8 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doSTableIntervalAgg; - pOperator->cleanupFn = destroyBasicOperatorInfo; + pOperator->nextDataFn = doSTableIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7433,8 +7433,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doAllSTableIntervalAgg; - pOperator->cleanupFn = destroyBasicOperatorInfo; + pOperator->nextDataFn = doAllSTableIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7465,8 +7465,8 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = hashGroupbyAggregate; - pOperator->cleanupFn = destroyGroupbyOperatorInfo; + pOperator->nextDataFn = hashGroupbyAggregate; + pOperator->closeFn = destroyGroupbyOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7504,8 +7504,8 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doFill; - pOperator->cleanupFn = destroySFillOperatorInfo; + pOperator->nextDataFn = doFill; + pOperator->closeFn = destroySFillOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7553,7 +7553,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI // pOperator->exec = doSLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->cleanupFn = destroySlimitOperatorInfo; + pOperator->closeFn = destroySlimitOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7707,11 +7707,11 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->exec = doTagScan; + pOperator->nextDataFn = doTagScan; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->cleanupFn = destroyTagScanOperatorInfo; + pOperator->closeFn = destroyTagScanOperatorInfo; return pOperator; } @@ -7777,7 +7777,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -7849,9 +7849,9 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = hashDistinct; + pOperator->nextDataFn = hashDistinct; pOperator->pExpr = pExpr; - pOperator->cleanupFn = destroyDistinctOperatorInfo; + pOperator->closeFn = destroyDistinctOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 262232ceb1..85f644335e 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->exec = getDummyBlock; + pOperator->nextDataFn = getDummyBlock; } else { - pOperator->exec = get2ColsDummyBlock; + pOperator->nextDataFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); @@ -435,7 +435,7 @@ TEST(testCase, external_sort_Test) { int64_t s2 = taosGetTimestampUs(); printf("total:%ld\n", s2 - s1); - pOperator->cleanupFn(pOperator->info, 2); + pOperator->closeFn(pOperator->info, 2); tfree(exp); tfree(exp1); taosArrayDestroy(pExprInfo); @@ -507,7 +507,7 @@ TEST(testCase, sorted_merge_Test) { int64_t s2 = taosGetTimestampUs(); printf("total:%ld\n", s2 - s1); - pOperator->cleanupFn(pOperator->info, 2); + pOperator->closeFn(pOperator->info, 2); tfree(exp); tfree(exp1); taosArrayDestroy(pExprInfo); @@ -560,7 +560,7 @@ TEST(testCase, time_interval_Operator_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->exec(pOperator, &newgroup); + pRes = pOperator->nextDataFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { @@ -583,7 +583,7 @@ TEST(testCase, time_interval_Operator_Test) { int64_t s2 = taosGetTimestampUs(); printf("total:%ld\n", s2 - s1); - pOperator->cleanupFn(pOperator->info, 2); + pOperator->closeFn(pOperator->info, 2); tfree(exp); tfree(exp1); taosArrayDestroy(pExprInfo); -- GitLab