From 51db5f573cafe0656a0227e2d8bb3876d9c82139 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Feb 2021 22:56:42 +0800 Subject: [PATCH] [td-225] refactor --- src/client/src/tscServer.c | 29 ++--- src/query/inc/qExecutor.h | 15 ++- src/query/src/qAggMain.c | 1 + src/query/src/qExecutor.c | 248 ++++++++++++++++++++++++++++++++----- src/query/src/queryMain.c | 2 +- 5 files changed, 239 insertions(+), 56 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e49eb6c034..d2cfa89ed5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -856,33 +856,28 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { for (int32_t i = 0; i < output; ++i) { SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SSqlExpr *pExpr = pField->pSqlExpr; + + // this should be switched to projection query if (pExpr != NULL) { if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { tscError("%p table schema is not matched with parsed sql", pSql); return TSDB_CODE_TSC_INVALID_SQL; } - pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId); - pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex); - pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag); - - pSqlFuncExpr1->functionId = htons(pExpr->functionId); - pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams); - pMsg += sizeof(SSqlFuncMsg); + pSqlFuncExpr1->numOfParams = 0; // no params for projection query + pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ); + pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId); + pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL); - for (int32_t j = 0; j < pExpr->numOfParams; ++j) { - // todo add log - pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); - pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen); - - if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { - memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); - pMsg += pExpr->param[j].nLen; - } else { - pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64); + for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) { + SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f); + if (pe == pExpr) { + pSqlFuncExpr1->colInfo.colIndex = htons(f); + break; } } + pMsg += sizeof(SSqlFuncMsg); pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; } else { assert(pField->pArithExprInfo != NULL); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 682686e737..a412bd123a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -283,12 +283,12 @@ typedef struct SQueryRuntimeEnv { SArithmeticSupport *sasArray; SOperatorInfo* pi; - SSDataBlock *outputBuf; + SSDataBlock *outputBuf; int32_t groupIndex; int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - SOperatorInfo* proot; + SOperatorInfo *proot; SGroupResInfo groupResInfo; } SQueryRuntimeEnv; @@ -362,8 +362,6 @@ typedef struct STableScanInfo { int64_t elapsedTime; } STableScanInfo; -SOperatorInfo optrList[5]; - typedef struct SAggOperatorInfo { SResultRowInfo *pResultRowInfo; STableQueryInfo *pTableQueryInfo; @@ -396,10 +394,19 @@ typedef struct SHashIntervalOperatorInfo { SQLFunctionCtx *pCtx; } SHashIntervalOperatorInfo; +typedef struct SFillOperatorInfo { + SResultRowInfo *pResultRowInfo; + STableQueryInfo *pTableQueryInfo; + SQueryRuntimeEnv *pRuntimeEnv; +} SFillOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols); +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, + SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr); + SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 8d4ce5ff86..82953203f3 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -1925,6 +1925,7 @@ static void last_function(SQLFunctionCtx *pCtx) { continue; } } + memcpy(pCtx->pOutput, data, pCtx->inputBytes); TSKEY ts = GET_TS_DATA(pCtx, i); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a61616badb..247fa8e012 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1220,10 +1220,11 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc } -static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { +static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock, int32_t order) { if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].size = pSDataBlock->info.rows; + pCtx[i].order = order; SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { @@ -1234,16 +1235,17 @@ static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, S SQLFunctionCtx* pCtx1 = &pCtx[i]; pCtx1->pInput = p->pData; - uint32_t status = aAggs[pCtx->functionId].status; + uint32_t status = aAggs[pCtx1->functionId].status; if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); - pCtx->ptsList = tsInfo->pData; + pCtx1->ptsList = tsInfo->pData; } } } } else { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].size = pSDataBlock->info.rows; + pCtx[i].order = order; } } } @@ -6307,7 +6309,7 @@ static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) { return pTableScanInfo->current; } -static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { +static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -6324,9 +6326,12 @@ static SSDataBlock* doAggregation(void* param) { SQuery* pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); - SOperatorInfo* upstream = pOperator->upstream; - setDefaultOutputBuf(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); + SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + setDefaultOutputBuf(pRuntimeEnv, pCtx, pRes); + + int32_t order = pQuery->order.order; + SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; while(1) { @@ -6335,8 +6340,13 @@ static SSDataBlock* doAggregation(void* param) { break; } + if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { + STableScanInfo* pScanInfo = upstream->optInfo; + order = getTableScanOrder(pScanInfo); + } + // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pCtx, pBlock); + setInputSDataBlock(pOperator, pCtx, pBlock, order); aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock); } @@ -6344,10 +6354,10 @@ static SSDataBlock* doAggregation(void* param) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); - pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); + pRes->info.rows = getNumOfResult(pRuntimeEnv); + destroySQLFunctionCtx(pCtx, pRes->info.numOfCols); - destroySQLFunctionCtx(pCtx, pRuntimeEnv->outputBuf->info.numOfCols); - return pRuntimeEnv->outputBuf; + return pRes; } static SSDataBlock* doArithmeticOperation(void* param) { @@ -6357,19 +6367,17 @@ static SSDataBlock* doArithmeticOperation(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; - SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); if (pArithInfo->pCtx == NULL) { pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); } setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes); - SOperatorInfo* upstream = pOperator->upstream; pRuntimeEnv->pQuery->pos = 0; - while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); break; @@ -6387,6 +6395,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j); if (p->info.colId == pCol->colId) { pArithInfo->pCtx[i].pInput = p->pData; + pArithInfo->pCtx[i].inputType = p->info.type; + pArithInfo->pCtx[i].inputBytes = p->info.bytes; break; } } @@ -6401,7 +6411,6 @@ static SSDataBlock* doArithmeticOperation(void* param) { } } - pRuntimeEnv->outputBuf = pRes; return pRes; } @@ -6413,8 +6422,7 @@ static SSDataBlock* doLimit(void* param) { SLimitOperatorInfo* pInfo = pOperator->optInfo; - SOperatorInfo* upstream = pOperator->upstream; - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); pOperator->completed = true; @@ -6435,10 +6443,9 @@ static SSDataBlock* doOffset(void* param) { SOperatorInfo *pOperator = (SOperatorInfo *)param; SOffsetOperatorInfo *pInfo = pOperator->optInfo; - SOperatorInfo* upstream = pOperator->upstream; while (1) { - SSDataBlock *pBlock = upstream->exec(upstream); + SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); return NULL; @@ -6471,14 +6478,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) { return NULL; } - SAggOperatorInfo* pAggInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; + SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo; + + SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); - SOperatorInfo* upstream = pOperator->upstream; + SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + int32_t order = pQuery->order.order; + SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; while(1) { @@ -6487,8 +6497,13 @@ static SSDataBlock* doHashIntervalAgg(void* param) { break; } + if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { + STableScanInfo* pScanInfo = upstream->optInfo; + order = getTableScanOrder(pScanInfo); + } + // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pCtx, pBlock); + setInputSDataBlock(pOperator, pCtx, pBlock, order); hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock); } @@ -6501,10 +6516,47 @@ static SSDataBlock* doHashIntervalAgg(void* param) { destroySQLFunctionCtx(pCtx, pOperator->numOfOutput); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRuntimeEnv->outputBuf); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes); -// pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); - return pRuntimeEnv->outputBuf; + return pRes; +} + +static SSDataBlock* doFill(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed) { + return NULL; + } + + SFillOperatorInfo *pInfo = pOperator->optInfo; + SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + + while(1) { + SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); + if (pBlock == NULL) { + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + return NULL; + } + + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + memcpy(pQuery->sdata[i]->data, pColInfoData->pData, pColInfoData->info.bytes*pBlock->info.rows); + } + + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); + taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata); + + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); + + // here the pQuery->rec.rows == 0 + if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { + break; + } + + return NULL; + } + + return NULL; } // todo set the attribute of query scan count @@ -6630,6 +6682,28 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ return pOperator; } +static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { + SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); + + pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pTableQueryInfo = pTableQueryInfo; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "FillOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->upstream = inputOptr; + pOperator->exec = doFill; + pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + pOperator->optInfo = pInfo; + + return pOperator; +} + + + /* * in each query, this function will be called only once, no retry for further result. * @@ -6644,8 +6718,8 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { return; } - SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pResBlock->info.rows; + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; } static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -6664,8 +6738,8 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) // return; // } - SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = (pResBlock != NULL)? pResBlock->info.rows : 0; + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows : 0; #if 0 while (1) { @@ -6756,8 +6830,8 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } } - SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pResBlock->info.rows; + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; #if 0 // scanOneTableDataBlocks(pRuntimeEnv, newStartKey); // finalizeQueryResult(pRuntimeEnv); @@ -7469,6 +7543,114 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu return TSDB_CODE_SUCCESS; } +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, + SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr) { + *pExprInfo = NULL; + int32_t code = TSDB_CODE_SUCCESS; + + SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo)); + if (pExprs == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); + + for (int32_t i = 0; i < numOfOutput; ++i) { + pExprs[i].base = *pExprMsg[i]; + pExprs[i].bytes = 0; + + int16_t type = 0; + int16_t bytes = 0; + + // parse the arithmetic expression + if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) { + code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg); + + if (code != TSDB_CODE_SUCCESS) { + tfree(pExprs); + return code; + } + + type = TSDB_DATA_TYPE_DOUBLE; + bytes = tDataTypes[type].bytes; +// } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column +// SSchema* s = tGetTbnameColumnSchema(); +// type = s->type; +// bytes = s->bytes; +// } else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { +// SSchema s = tGetBlockDistColumnSchema(); +// type = s.type; +// bytes = s.bytes; +// } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) { +// // it is a user-defined constant value column +// assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ); +// +// type = pExprs[i].base.arg[1].argType; +// bytes = pExprs[i].base.arg[1].argBytes; +// +// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { +// bytes += VARSTR_HEADER_SIZE; +// } + } else { + int32_t index = pExprs[i].base.colInfo.colIndex; + assert(prevExpr[index].base.resColId == pExprs[i].base.colInfo.colId); + +// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); +// if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) { +// if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) { +// return TSDB_CODE_QRY_INVALID_MSG; +// } +// } else { +// if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) { +// return TSDB_CODE_QRY_INVALID_MSG; +// } +// } +// +// if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { +// SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; +// type = pCol->type; +// bytes = pCol->bytes; +// } else { +// SSchema* s = tGetTbnameColumnSchema(); + + type = prevExpr[index].type; + bytes = prevExpr[index].bytes; +// } + } + + int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; + if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, + &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { + tfree(pExprs); + return TSDB_CODE_QRY_INVALID_MSG; + } + + assert(isValidDataType(pExprs[i].type)); + } + +// // TODO refactor +// for (int32_t i = 0; i < numOfOutput; ++i) { +// pExprs[i].base = *pExprMsg[i]; +// int16_t functId = pExprs[i].base.functionId; +// +// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { +// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); +// if (j < 0 || j >= pQueryMsg->numOfCols) { +// return TSDB_CODE_QRY_INVALID_MSG; +// } else { +// SColumnInfo *pCol = &pQueryMsg->colList[j]; +// int32_t ret = +// getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64, +// &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable); +// assert(ret == TSDB_CODE_SUCCESS); +// } +// } +// } + + *pExprInfo = pExprs; + return TSDB_CODE_SUCCESS; +} + SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; @@ -7667,8 +7849,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } doUpdateExprColumnIndex(pQuery); - pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery->pExpr1, pQuery->numOfOutput); - int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { goto _cleanup; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 4ba1c94097..02e7fb9a8c 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -96,7 +96,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi } if (param.pSecExprMsg != NULL) { - if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) { goto _over; } } -- GitLab