From 4e4e7b2454572024b0f1a2333f5056cafbae2c8e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Apr 2021 22:48:21 +0800 Subject: [PATCH] [td-2895]enable arithmetic calculation for super table. --- src/client/src/tscUtil.c | 67 ++++++++++++++++++++++----------------- src/query/inc/qExecutor.h | 1 + src/query/src/qExecutor.c | 13 ++++---- src/query/src/qPlan.c | 17 ++++++---- 4 files changed, 57 insertions(+), 41 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9e15d71350..44849171a0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3278,8 +3278,9 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf for (int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) { SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SExprInfo* pExpr = pField->pExpr; +// SExprInfo *pExpr = &pQueryAttr->pExpr3[i]; - SSqlExpr* pse = &pQueryAttr->pExpr2[i].base; + SSqlExpr *pse = &pQueryAttr->pExpr2[i].base; pse->uid = pTableMetaInfo->pTableMeta->id.uid; pse->resColId = pExpr->base.resColId; @@ -3295,10 +3296,16 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf } pse->colInfo.flag = TSDB_COL_NORMAL; - pse->colType = pExpr->base.resType; - pse->colBytes = pExpr->base.resBytes; pse->resType = pExpr->base.resType; pse->resBytes = pExpr->base.resBytes; + + // TODO restore refactor + int32_t inter = 0; + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, pExpr->base.functionId, 0, &pse->resType, + &pse->resBytes, &inter, 0, false); + pse->colType = pse->resType; + pse->colBytes = pse->resBytes; + } else { // arithmetic expression pse->colInfo.colId = pExpr->base.colInfo.colId; pse->colType = pExpr->base.colType; @@ -3311,6 +3318,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { tVariantAssign(&pse->param[j], &pExpr->base.param[j]); + buildArithmeticExprFromMsg(&pQueryAttr->pExpr2[i], NULL); } } } @@ -3321,7 +3329,7 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo) { assert(tscIsTwoStageSTableQuery(pQueryInfo, 0)); - pQueryAttr->numOfExpr3 = tscNumOfFields(pQueryInfo); + pQueryAttr->numOfExpr3 = tscSqlExprNumOfExprs(pQueryInfo); pQueryAttr->pExpr3 = calloc(pQueryAttr->numOfExpr3, sizeof(SExprInfo)); if (pQueryAttr->pExpr3 == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -3343,32 +3351,33 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu tVariantAssign(&pse->param[j], &pExpr->base.param[j]); } } - { - for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) { - SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; - SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; - // the final result size and type in the same as query on single table. - // so here, set the flag to be false; - int32_t inter = 0; + { + for (int32_t i = 0; i < pQueryAttr->numOfExpr3; ++i) { + SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; + SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; - int32_t functionId = pExpr->base.functionId; - if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { - continue; - } + // the final result size and type in the same as query on single table. + // so here, set the flag to be false; + int32_t inter = 0; - if (functionId == TSDB_FUNC_FIRST_DST) { - functionId = TSDB_FUNC_FIRST; - } else if (functionId == TSDB_FUNC_LAST_DST) { - functionId = TSDB_FUNC_LAST; - } else if (functionId == TSDB_FUNC_STDDEV_DST) { - functionId = TSDB_FUNC_STDDEV; - } + int32_t functionId = pExpr->base.functionId; + if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { + continue; + } - getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, - &pse->resBytes, &inter, 0, false); + if (functionId == TSDB_FUNC_FIRST_DST) { + functionId = TSDB_FUNC_FIRST; + } else if (functionId == TSDB_FUNC_LAST_DST) { + functionId = TSDB_FUNC_LAST; + } else if (functionId == TSDB_FUNC_STDDEV_DST) { + functionId = TSDB_FUNC_STDDEV; } + + getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter, + 0, false); } + } return TSDB_CODE_SUCCESS; } @@ -3470,17 +3479,17 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->colList[i].numOfFilters); } + // global aggregate query + if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + createGlobalAggregateExpr(pQueryAttr, pQueryInfo); + } + // for simple table, not for super table int32_t code = createSecondaryExpr(pQueryAttr, pQueryInfo, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } - // global aggregate query - if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - createGlobalAggregateExpr(pQueryAttr, pQueryInfo); - } - // tag column info code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 6b968d8542..ebc751c3c9 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -522,6 +522,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, SQInfo* pQInfo, SQueryPara void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); +int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0cb6580f05..fc031e8876 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -913,7 +913,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, // in case of the block distribution query, the inputBytes is not a constant value. pCtx[i].pInput = p->pData; - assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);// && pCtx[i].inputBytes == p->info.bytes); + assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type); uint32_t status = aAggs[pCtx[i].functionId].status; if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { @@ -3936,6 +3936,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* ts // create runtime environment int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5890,24 +5891,24 @@ _cleanup: return code; } -static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg) { +int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) { qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); tExprNode* pExprNode = NULL; TRY(TSDB_MAX_TAG_CONDITIONS) { - pExprNode = exprTreeFromBinary(pArithExprInfo->base.param[0].pz, pArithExprInfo->base.param[0].nLen); + pExprNode = exprTreeFromBinary(pExprInfo->base.param[0].pz, pExprInfo->base.param[0].nLen); } CATCH( code ) { CLEANUP_EXECUTE(); - qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pArithExprInfo->base.param[0].pz, tstrerror(code)); + qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pExprInfo->base.param[0].pz, tstrerror(code)); return code; } END_TRY if (pExprNode == NULL) { - qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.param[0].pz); + qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExprInfo->base.param[0].pz); return TSDB_CODE_QRY_APP_ERROR; } - pArithExprInfo->pExpr = pExprNode; + pExprInfo->pExpr = pExprNode; return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 9d18bae5cd..3c4a9f9fef 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -136,12 +136,6 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { int32_t op = OP_MultiwaySort; taosArrayPush(plan, &op); - // fill operator - if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { - op = OP_Fill; - taosArrayPush(plan, &op); - } - // arithmetic operator if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) { op = OP_Arithmetic; @@ -149,6 +143,17 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { } else { op = OP_GlobalAggregate; taosArrayPush(plan, &op); + + if (pQueryAttr->pExpr2 != NULL) { + op = OP_Arithmetic; + taosArrayPush(plan, &op); + } + } + + // fill operator + if (pQueryAttr->fillType != TSDB_FILL_NONE && (!pQueryAttr->pointInterpQuery)) { + op = OP_Fill; + taosArrayPush(plan, &op); } // limit/offset operator -- GitLab