From 266b878ba2c6c84d48f9c7a15152913662ed8836 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 May 2021 22:57:57 +0800 Subject: [PATCH] [td-3299] --- src/query/inc/qExecutor.h | 3 +- src/query/inc/qUtil.h | 1 + src/query/src/qExecutor.c | 94 ++++++++++++++++++++++++--------------- src/query/src/qPlan.c | 9 +++- 4 files changed, 68 insertions(+), 39 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c028aa0768..3e0f1f5e8f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -514,7 +514,8 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx int32_t numOfRows, void* merger, bool groupMix); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); -SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index c340d8f952..0756e41785 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -48,6 +48,7 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, in SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); +void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e979cf3f12..da857533ac 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -167,6 +167,8 @@ static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, S static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static bool hasMainOutput(SQueryAttr *pQueryAttr); +static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); + static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); @@ -1765,12 +1767,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_Filter: { // todo refactor - if (pQueryAttr->stableQuery) { - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3); + int32_t numOfFilterCols = 0; + if (pQueryAttr->numOfFilterCols > 0) { + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, + pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols); } else { - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pQueryAttr->stableQuery) { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); + } else { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, + pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); + } } - break; } @@ -5409,38 +5424,37 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, - int32_t numOfOutput) { - SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); +static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { + SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo)); - { - SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo)); + int32_t numOfFilter = 0; + for(int32_t i = 0; i < numOfOutput; ++i) { + if (pExpr[i].base.flist.numOfFilters > 0) { + numOfFilter += 1; + } - int32_t numOfFilter = 0; - for(int32_t i = 0; i < numOfOutput; ++i) { - if (pExpr[i].base.flist.numOfFilters > 0) { - numOfFilter += 1; - } + pCols[i].type = pExpr[i].base.resType; + pCols[i].bytes = pExpr[i].base.resBytes; + pCols[i].colId = pExpr[i].base.resColId; - pCols[i].type = pExpr[i].base.resType; - pCols[i].bytes = pExpr[i].base.resBytes; - pCols[i].colId = pExpr[i].base.resColId; + pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; + pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); + memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); + } - pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; - pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); - memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); - } + assert(numOfFilter > 0); - assert(numOfFilter > 0); - doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); - pInfo->numOfFilterCols = numOfFilter; + *numOfFilterCols = numOfFilter; + return pCols; +} - for(int32_t i = 0; i < numOfOutput; ++i) { - tfree(pCols[i].flist.filterInfo); - } +SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { + SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); - tfree(pCols); - } + assert(numOfFilter > 0 && pCols != NULL); + doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); + pInfo->numOfFilterCols = numOfFilter; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7097,6 +7111,20 @@ void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) { return NULL; } +void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { + if (pColumnInfo != NULL) { + assert(numOfCols > 0); + + for (int32_t i = 0; i < numOfCols; i++) { + freeColumnFilterInfo(pColumnInfo[i].flist.filterInfo, pColumnInfo[i].flist.numOfFilters); + } + + tfree(pColumnInfo); + } + + return NULL; +} + void freeQInfo(SQInfo *pQInfo) { if (!isValidQInfo(pQInfo)) { return; @@ -7281,13 +7309,7 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) { tfree(pQueryAttr->tagColList); tfree(pQueryAttr->pFilterInfo); - if (pQueryAttr->tableCols != NULL) { - for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) { - SColumnInfo* column = pQueryAttr->tableCols + i; - freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters); - } - tfree(pQueryAttr->tableCols); - } + pQueryAttr->tableCols = freeColumnInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols); if (pQueryAttr->pGroupbyExpr != NULL) { taosArrayDestroy(pQueryAttr->pGroupbyExpr->columnInfo); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 2686473d87..c46257ba9e 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -615,8 +615,13 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } } else { // diff/add/multiply/subtract/division - op = OP_Arithmetic; - taosArrayPush(plan, &op); + if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->vgId == 0) { // todo refactor + op = OP_Filter; + taosArrayPush(plan, &op); + } else { + op = OP_Arithmetic; + taosArrayPush(plan, &op); + } } if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { -- GitLab