From ed0b92bb33479a5c7de5e1874ef1ef25e87a5c4f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Sep 2022 14:12:39 +0800 Subject: [PATCH] refactor(query): opt perf. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 21 ++++++++++++------- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 8 +++---- source/libs/executor/src/scanoperator.c | 15 ++++++++----- source/libs/executor/src/sortoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 12 +++++------ 8 files changed, 38 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c7541583cd..a2828779a2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -917,7 +917,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c0a88b066e..7b2ca35c29 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1117,18 +1117,22 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } - SFilterInfo* filter = NULL; + SFilterInfo* filter = pFilterInfo; int64_t st = taosGetTimestampUs(); - pError("start filter") + pError("start filter"); + // todo move to the initialization function - int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + int32_t code = 0; + if (filter == NULL) { + code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + } int64_t st1 = taosGetTimestampUs(); pError("init completed, el: %d us", st1-st); @@ -1145,7 +1149,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM // todo the keep seems never to be True?? int32_t status = 0; bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols, &status); - filterFreeInfo(filter); + + if (pFilterInfo == NULL) { + filterFreeInfo(filter); + } int64_t st3 = taosGetTimestampUs(); pError("do filter, el: %d us", st3-st2); @@ -2456,7 +2463,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); + doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL); if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -2850,7 +2857,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo); + doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL); if (fillResult->info.rows > 0) { break; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5eb6557dbd..d30d8ff02e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, NULL); + doFilter(pInfo->pCondition, pRes, NULL, NULL); if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 1bc7d458e0..13e4c2cc65 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -386,7 +386,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } if (pJoinInfo->pCondAfterMerge != NULL) { - doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL); + doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL); } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2f12a0d19b..82c6033575 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -294,7 +294,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // do apply filter - doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); + doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL); // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { @@ -303,7 +303,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } else { // do apply filter if (pRes->info.rows > 0) { - doFilter(pProjectInfo->pFilterNode, pRes, NULL); + doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); if (pRes->info.rows == 0) { continue; } @@ -496,7 +496,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } } - doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL); + doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { break; @@ -598,7 +598,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { } pRes->info.rows = 1; - doFilter(pProjectInfo->pFilterNode, pRes, NULL); + doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index edc7ddb92e..60e1c2d86d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -380,7 +380,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -747,6 +747,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + + if (pInfo->pFilterNode != NULL) { + code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); + } + pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColList; pInfo->currentGroupId = -1; @@ -1112,7 +1117,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 return NULL; } - doFilter(pInfo->pCondition, pResult, NULL); + doFilter(pInfo->pCondition, pResult, NULL, NULL); if (pResult->info.rows == 0) { continue; } @@ -1401,7 +1406,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } } - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); return 0; @@ -2147,7 +2152,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } @@ -3139,7 +3144,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index e2014ec973..b4872508be 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return NULL; } - doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo); + doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c221aaf4fd..5b38b81b30 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1215,7 +1215,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1254,7 +1254,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1291,7 +1291,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBlock, NULL); + doFilter(pInfo->pCondition, pBlock, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1865,7 +1865,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1908,7 +1908,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -5041,7 +5041,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(pMiaInfo->pCondition, pRes, NULL); + doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } -- GitLab