From 61b351a14f14a1b4f27e0596c78541d66f4b8181 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Mar 2022 18:02:56 +0800 Subject: [PATCH] [td-13039] refactor. --- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executorimpl.c | 125 +++++++++++------------- 2 files changed, 62 insertions(+), 71 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ee841e3ce9..5368b2397a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -240,12 +240,12 @@ typedef struct STaskAttr { SArray* pUdfInfo; // no need to free } STaskAttr; -typedef int32_t (*__optr_open_fn_t)(void* param); -typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup); -typedef void (*__optr_close_fn_t)(void* param, int32_t num); - struct SOperatorInfo; +typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); +typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); +typedef void (*__optr_close_fn_t)(void* param, int32_t num); + typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id uint64_t subplanId; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 420d70233c..f472d1e00c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -227,8 +227,7 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) -static int32_t operatorDummyOpenFn(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static int32_t operatorDummyOpenFn(SOperatorInfo *pOperator) { OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -4681,9 +4680,7 @@ static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) { } } -static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; - +static SSDataBlock* doTableScanImpl(SOperatorInfo *pOperator, bool* newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; @@ -4731,9 +4728,7 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { return NULL; } -static SSDataBlock* doTableScan(void* param, bool *newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - +static SSDataBlock* doTableScan(SOperatorInfo *pOperator, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; @@ -4804,8 +4799,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { return p; } -static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo*)param; +static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -4852,9 +4846,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { #endif } -static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - +static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { // NOTE: this operator never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; @@ -5170,8 +5162,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } } -static int32_t prepareLoadRemoteData(void* param) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; +static int32_t prepareLoadRemoteData(SOperatorInfo *pOperator) { if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; } @@ -5190,9 +5181,7 @@ static int32_t prepareLoadRemoteData(void* param) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; - +static SSDataBlock* doLoadRemoteData(SOperatorInfo *pOperator, bool* newgroup) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; @@ -5418,7 +5407,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doTableScanImpl; + pOperator->getNextFn = doTableScanImpl; return pOperator; } @@ -5497,9 +5486,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t tsem_post(&pSourceDataInfo->pEx->ready); } -static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { +static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { // build message and send to mnode to fetch the content of system tables. - SOperatorInfo* pOperator = (SOperatorInfo*) param; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; @@ -5989,8 +5977,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; } -static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6160,8 +6147,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t return NULL; } -static SSDataBlock* doSort(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6247,46 +6233,63 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { } // this is a blocking operator -static SSDataBlock* doAggregate(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; +static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; } SAggOperatorInfo* pAggInfo = pOperator->info; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; - while(1) { + bool newgroup = true; + while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } -// if (pAggInfo->current != NULL) { -// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); -// } + // if (pAggInfo->current != NULL) { + // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); } - doSetOperatorCompleted(pOperator); - finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) { + SAggOperatorInfo *pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes); + doSetOperatorCompleted(pOperator); return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL; } -static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6354,9 +6357,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { return pInfo->pRes; } -static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - +static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) { SProjectOperatorInfo* pProjectInfo = pOperator->info; STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SOptrBasicInfo *pInfo = &pProjectInfo->binfo; @@ -6443,8 +6444,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } -static SSDataBlock* doLimit(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; +static SSDataBlock* doLimit(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6526,8 +6526,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { return NULL; } -static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6582,8 +6581,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { return pInfo->binfo.pRes->info.rows == 0? NULL:pInfo->binfo.pRes; } -static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6642,8 +6640,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->binfo.pRes->info.rows == 0? NULL:pIntervalInfo->binfo.pRes; } -static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6702,8 +6699,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->binfo.pRes; } -static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6836,8 +6832,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI // pSDataBlock->info.rows, pOperator->numOfOutput); } -static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doStateWindowAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6894,8 +6889,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; } -static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6954,8 +6948,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; } -static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -7044,9 +7037,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti } } -static SSDataBlock* doFill(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - +static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { SFillOperatorInfo *pInfo = pOperator->info; pInfo->pRes->info.rows = 0; @@ -7225,7 +7216,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doAggregate; + pOperator->_openFn = doOpenAggregateOptr; + pOperator->getNextFn = getAggregateResult; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7364,7 +7356,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; - // pOperator->operatorType = OP_Project; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; @@ -7704,7 +7696,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -static SSDataBlock* doTagScan(void* param, bool* newgroup) { +static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) { #if 0 SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -7909,8 +7901,7 @@ static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBl } } -static SSDataBlock* hashDistinct(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; +static SSDataBlock* hashDistinct(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } -- GitLab