From 60010d2e2595db5b600ad36053fa8314b5afe9a5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Mar 2021 12:07:10 +0800 Subject: [PATCH] [td-2819] refactor codes. --- src/client/inc/tscUtil.h | 12 +- src/client/src/tscSQLParser.c | 4 - src/client/src/tscServer.c | 66 +++----- src/client/src/tscSub.c | 2 + src/client/src/tscSubquery.c | 12 +- src/client/src/tscUtil.c | 281 ++++++++++++++++++---------------- src/query/inc/qExecutor.h | 4 +- src/query/src/qExecutor.c | 157 ++++++------------- 8 files changed, 227 insertions(+), 311 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index a4f720edc4..5f26725f4b 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -127,10 +127,10 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool hasTagValOutput(SQueryInfo* pQueryInfo); -bool timeWindowInterpoRequired(SQueryInfo *pQueryNodeInfo); +bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo); bool isStabledev(SQueryInfo* pQueryInfo); -bool isTsCompQuery(SQueryInfo* pQueryNodeInfo); -bool isSimpleAggregate(SQueryInfo* pQueryNodeInfo); +bool isTsCompQuery(SQueryInfo* pQueryInfo); +bool isSimpleAggregate(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); @@ -300,11 +300,11 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta); uint32_t tscGetTableMetaMaxSize(); int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name); STableMeta* tscTableMetaDup(STableMeta* pTableMeta); -int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryNodeInfo, SQuery* pQuery); +int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr); void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema); -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, - uint64_t* qId, char* sql); +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, + uint64_t* qId, char* sql, void* addr); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index dd17977949..4a4def1c0c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6642,10 +6642,6 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { return meta; } -//static SColumnInfo* getColumnInfoFromSchema(SQueryInfo* pUpstream) { -// -//} - int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index df146a6412..1dea0aa6d7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -426,15 +426,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (*pSql->fp)(pSql->param, pSql, rpcMsg->code); } - - if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it taosRemoveRef(tscObjRef, handle); tscDebug("%p sqlObj is automatically freed", pSql); } taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); } @@ -707,35 +704,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQuery query = {0}; - tscCreateQueryFromQueryInfo(pQueryInfo, &query); + tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); SArray* tableScanOperator = createTableScanPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; -/* - size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList); - if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) { - tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols, - tscGetNumOfColumns(pTableMeta)); - - return TSDB_CODE_TSC_INVALID_SQL; - } - - if (pQueryInfo->interval.interval < 0) { - tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, pSql, (int64_t)pQueryInfo->interval.interval); - return TSDB_CODE_TSC_INVALID_SQL; - } - - if (pQueryInfo->groupbyExpr.numOfGroupCols < 0) { - tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols); - return TSDB_CODE_TSC_INVALID_SQL; - } -*/ { - - SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); @@ -764,17 +740,29 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->offset = htobe64(query.limit.offset); pQueryMsg->numOfCols = htons(query.numOfCols); - pQueryMsg->interval.interval = htobe64(query.interval.interval); - pQueryMsg->interval.sliding = htobe64(query.interval.sliding); - pQueryMsg->interval.offset = htobe64(query.interval.offset); + pQueryMsg->interval.interval = htobe64(query.interval.interval); + pQueryMsg->interval.sliding = htobe64(query.interval.sliding); + pQueryMsg->interval.offset = htobe64(query.interval.offset); pQueryMsg->interval.intervalUnit = query.interval.intervalUnit; pQueryMsg->interval.slidingUnit = query.interval.slidingUnit; pQueryMsg->interval.offsetUnit = query.interval.offsetUnit; - pQueryMsg->numOfTags = htonl(numOfTags); - pQueryMsg->sqlstrLen = htonl(sqlLen); - pQueryMsg->sw.gap = htobe64(query.sw.gap); - pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); + pQueryMsg->stableQuery = query.stableQuery; + pQueryMsg->topBotQuery = query.topBotQuery; + pQueryMsg->groupbyColumn = query.groupbyColumn; + pQueryMsg->hasTagResults = query.hasTagResults; + pQueryMsg->timeWindowInterpo = query.timeWindowInterpo; + pQueryMsg->queryBlockDist = query.queryBlockDist; + pQueryMsg->stabledev = query.stabledev; + pQueryMsg->tsCompQuery = query.tsCompQuery; + pQueryMsg->simpleAgg = query.simpleAgg; + pQueryMsg->pointInterpQuery = query.pointInterpQuery; + pQueryMsg->needReverseScan = query.needReverseScan; + + pQueryMsg->numOfTags = htonl(numOfTags); + pQueryMsg->sqlstrLen = htonl(sqlLen); + pQueryMsg->sw.gap = htobe64(query.sw.gap); + pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX); pQueryMsg->secondStageOutput = htonl(query.numOfExpr2); pQueryMsg->numOfOutput = htons((int16_t)query.numOfOutput); // this is the stage one output column number @@ -825,20 +813,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - { - pQueryMsg->stableQuery = query.stableQuery; - pQueryMsg->topBotQuery = query.topBotQuery; - pQueryMsg->groupbyColumn = query.groupbyColumn; - pQueryMsg->hasTagResults = query.hasTagResults; - pQueryMsg->timeWindowInterpo = query.timeWindowInterpo; - pQueryMsg->queryBlockDist = query.queryBlockDist; - pQueryMsg->stabledev = query.stabledev; - pQueryMsg->tsCompQuery = query.tsCompQuery; - pQueryMsg->simpleAgg = query.simpleAgg; - pQueryMsg->pointInterpQuery = query.pointInterpQuery; - pQueryMsg->needReverseScan = query.needReverseScan; - } - SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg; for (int32_t i = 0; i < query.numOfOutput; ++i) { diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index ad850d2a9e..7f7d9abe74 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -487,11 +487,13 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { if (pSql == NULL) { return NULL; } + if (pSub->pSql->self != 0) { taosReleaseRef(tscObjRef, pSub->pSql->self); } else { tscFreeSqlObj(pSub->pSql); } + pSub->pSql = pSql; pSql->pSubscription = pSub; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3eb53cb863..f0baa0bcea 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3215,10 +3215,10 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } -void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, - uint64_t* qId, char* sql) { - assert(pQueryNodeInfo != NULL); - int16_t numOfOutput = pQueryNodeInfo->fieldsInfo.numOfOutput; +void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, + uint64_t* qId, char* sql, void* addr) { + assert(pQueryInfo != NULL); + int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -3227,10 +3227,10 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryNodeInfo, SExprInfo* pExprs // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - SQuery *pQuery = &pQInfo->query; - tscCreateQueryFromQueryInfo(pQueryNodeInfo, pQuery); + SQuery *pQuery = &pQInfo->query; pQInfo->runtimeEnv.pQuery = pQuery; + tscCreateQueryFromQueryInfo(pQueryInfo, pQuery, addr); // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 82243103b0..941c5a5407 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -104,7 +104,6 @@ bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) { SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); int32_t functId = pExpr->base.functionId; - // "select count(tbname)" query if (functId == TSDB_FUNC_BLKINFO) { return true; } @@ -568,7 +567,7 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { taosArrayPush(tableGroupInfo.pGroupList, &group); - SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL); + SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL, NULL); printf("%p\n", pQInfo); } } @@ -3073,159 +3072,156 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) { return p; } -int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) { - memset(pQuery, 0, sizeof(SQuery)); +static int32_t createSecondaryExpr(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) { + if (!tscIsSecondStageQuery(pQueryInfo)) { + return TSDB_CODE_SUCCESS; + } - pQuery->tsdb = NULL; - pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); - pQuery->hasTagResults = hasTagValOutput(pQueryInfo); - pQuery->stabledev = isStabledev(pQueryInfo); - pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); - pQuery->simpleAgg = isSimpleAggregate(pQueryInfo); - pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo); - pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); - pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo); - pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); - pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); + pQuery->numOfExpr2 = tscNumOfFields(pQueryInfo); + pQuery->pExpr2 = calloc(pQuery->numOfExpr2, sizeof(SExprInfo)); + if (pQuery->pExpr2 == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } - int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList); - int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput; - - pQuery->numOfCols = numOfCols; - pQuery->numOfOutput = numOfOutput; - pQuery->limit = pQueryInfo->limit; - pQuery->order = pQueryInfo->order; - pQuery->pExpr1 = NULL; - pQuery->pExpr2 = NULL; // not support yet. - pQuery->numOfExpr2 = 0; - pQuery->pGroupbyExpr = NULL; - pQuery->fillType = pQueryInfo->fillType; - pQuery->numOfTags = 0; - pQuery->tagColList = NULL; + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); + SExprInfo* pExpr = pField->pExpr; - memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval)); + SSqlExpr* pse = &pQuery->pExpr2[i].base; + pse->uid = pTableMetaInfo->pTableMeta->id.uid; + pse->resColId = pExpr->base.resColId; - STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; + if (pExpr->pExpr == NULL) { // this should be switched to projection query + pse->numOfParams = 0; // no params for projection query + pse->functionId = TSDB_FUNC_PRJ; + pse->colInfo.colId = pExpr->base.resColId; - pQuery->vgId = 0; - pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); - pQuery->window = pQueryInfo->window; + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) { + pse->colInfo.colIndex = j; + } + } - pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); - *pQuery->pGroupbyExpr = pQueryInfo->groupbyExpr; + pse->colInfo.flag = TSDB_COL_NORMAL; + pse->colType = pExpr->base.resType; + pse->colBytes = pExpr->base.resBytes; + pse->resType = pExpr->base.resType; + pse->resBytes = pExpr->base.resBytes; + } else { // arithmetic expression + pse->colInfo.colId = pExpr->base.colInfo.colId; + pse->colType = pExpr->base.colType; + pse->colBytes = pExpr->base.colBytes; + pse->resBytes = sizeof(double); + pse->resType = TSDB_DATA_TYPE_DOUBLE; + + pse->functionId = pExpr->base.functionId; + pse->numOfParams = pExpr->base.numOfParams; + + for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { + tVariantAssign(&pse->param[j], &pExpr->base.param[j]); + } + } + } - { - pQuery->numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); - pQuery->pExpr1 = calloc(pQuery->numOfOutput, sizeof(SExprInfo)); + return TSDB_CODE_SUCCESS; +} - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); - tscSqlExprAssign(&pQuery->pExpr1[i], pExpr); - } +static int32_t createTagColumnInfo(SQuery* pQuery, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) { + pQuery->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); + if (pQuery->numOfTags == 0) { + return TSDB_CODE_SUCCESS; + } - pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo)); - for(int32_t i = 0; i < numOfCols; ++i) { - SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); - if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) { - assert(0); - } + STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; - pQuery->colList[i] = pCol->info; - pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters); - } + int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); + + pQuery->tagColList = calloc(pQuery->numOfTags, sizeof(SColumnInfo)); + if (pQuery->tagColList == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; } - {// for simple table, not for super table - if (tscIsSecondStageQuery(pQueryInfo)) { - pQuery->numOfExpr2 = tscNumOfFields(pQueryInfo); - pQuery->pExpr2 = calloc(pQuery->numOfExpr2, sizeof(SExprInfo)); + SSchema* pSchema = tscGetTableTagSchema(pTableMeta); + for (int32_t i = 0; i < pQuery->numOfTags; ++i) { + SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); + SSchema* pColSchema = &pSchema[pCol->colIndex.columnIndex]; - for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { - SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); + if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < TSDB_TBNAME_COLUMN_INDEX) || + (!isValidDataType(pColSchema->type))) { + return TSDB_CODE_TSC_INVALID_SQL; + } - SExprInfo* pExpr = pField->pExpr; + SColumnInfo* pTagCol = &pQuery->tagColList[i]; - SSqlExpr* pse = &pQuery->pExpr2[i].base; - pse->uid = pTableMetaInfo->pTableMeta->id.uid; + pTagCol->colId = pColSchema->colId; + pTagCol->bytes = pColSchema->bytes; + pTagCol->type = pColSchema->type; + pTagCol->numOfFilters = 0; + } - // this should be switched to projection query - if (pExpr->pExpr == NULL) { - pse->numOfParams = 0; // no params for projection query - pse->functionId = TSDB_FUNC_PRJ; - pse->colInfo.colId = pExpr->base.resColId; + return TSDB_CODE_SUCCESS; +} - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - if (pQuery->pExpr1[j].base.resColId == pse->colInfo.colId) { - pse->colInfo.colIndex = j; - } - } - pse->colInfo.flag = TSDB_COL_NORMAL; - pse->colType = pExpr->base.resType; - pse->colBytes = pExpr->base.resBytes; - pse->resType = pExpr->base.resType; - pse->resBytes = pExpr->base.resBytes; - } else { - assert(pField->pExpr->pExpr != NULL); +int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery, void* addr) { + memset(pQuery, 0, sizeof(SQuery)); - pse->colInfo.colId = pExpr->base.colInfo.colId; - pse->colType = pExpr->base.colType; - pse->colBytes = pExpr->base.colBytes; - pse->resBytes = sizeof(double); - pse->resType = TSDB_DATA_TYPE_DOUBLE; + int16_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + int16_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); + + pQuery->topBotQuery = tscIsTopBotQuery(pQueryInfo); + pQuery->hasTagResults = hasTagValOutput(pQueryInfo); + pQuery->stabledev = isStabledev(pQueryInfo); + pQuery->tsCompQuery = isTsCompQuery(pQueryInfo); + pQuery->simpleAgg = isSimpleAggregate(pQueryInfo); + pQuery->needReverseScan = tscNeedReverseScan(pQueryInfo); + pQuery->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); + pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQuery->queryBlockDist = isBlockDistQuery(pQueryInfo); + pQuery->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); + pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); + + pQuery->numOfCols = numOfCols; + pQuery->numOfOutput = numOfOutput; + pQuery->limit = pQueryInfo->limit; + pQuery->order = pQueryInfo->order; + pQuery->fillType = pQueryInfo->fillType; + pQuery->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQuery->window = pQueryInfo->window; - pse->functionId = pExpr->base.functionId; - pse->numOfParams = pExpr->base.numOfParams; + memcpy(&pQuery->interval, &pQueryInfo->interval, sizeof(pQuery->interval)); - memset(pse->param, 0, sizeof(tVariant) * tListLen(pse->param)); - for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { - tVariantAssign(&pse->param[j], &pExpr->base.param[j]); - } - } + STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; - pse->resColId = pExpr->base.resColId; - pse->uid = pTableMetaInfo->pTableMeta->id.uid; - } - } + pQuery->pGroupbyExpr = calloc(1, sizeof(SSqlGroupbyExpr)); + *(pQuery->pGroupbyExpr) = pQueryInfo->groupbyExpr; + + pQuery->pExpr1 = calloc(pQuery->numOfOutput, sizeof(SExprInfo)); + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); + tscSqlExprAssign(&pQuery->pExpr1[i], pExpr); } - // tag column info - { - pQuery->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList); - if (pQuery->numOfTags > 0) { // todo index problem - STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; - -// int32_t numOfColumns = tscGetNumOfColumns(pTableMeta); - int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta); -// int32_t total = numOfTagColumns + numOfColumns; - - SSchema* pSchema = tscGetTableTagSchema(pTableMeta); - pQuery->tagColList = calloc(pQuery->numOfTags, sizeof(SColumnInfo)); - pQuery->numOfTags = pQuery->numOfTags; - - for (int32_t i = 0; i < pQuery->numOfTags; ++i) { - SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); - SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; - - if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) || - (!isValidDataType(pColSchema->type))) { - char n[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, n); -// tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s", -// pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name); - - return TSDB_CODE_SUCCESS; - } + pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo)); + for(int32_t i = 0; i < numOfCols; ++i) { + SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); + if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) { + assert(0); + } - SColumnInfo* pTagCol = &pQuery->tagColList[i]; + pQuery->colList[i] = pCol->info; + pQuery->colList[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQuery->colList[i].numOfFilters); + } - pTagCol->colId = pColSchema->colId; - pTagCol->bytes = pColSchema->bytes; - pTagCol->type = pColSchema->type; - pTagCol->numOfFilters = 0; - } - } + // for simple table, not for super table + int32_t code = createSecondaryExpr(pQuery, pQueryInfo, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // tag column info + code = createTagColumnInfo(pQuery, pQueryInfo, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } if (pQuery->fillType != TSDB_FILL_NONE) { @@ -3234,14 +3230,33 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQuery* pQuery) { } pQuery->srcRowSize = 0; - pQuery->maxSrcColumnSize = 0; + pQuery->maxTableColumnWidth = 0; for (int16_t i = 0; i < numOfCols; ++i) { pQuery->srcRowSize += pQuery->colList[i].bytes; - if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { - pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; + if (pQuery->maxTableColumnWidth < pQuery->colList[i].bytes) { + pQuery->maxTableColumnWidth = pQuery->colList[i].bytes; } } pQuery->interBufSize = getOutputInterResultBufSize(pQuery); + + if (pQuery->numOfCols <= 0 && !tscQueryTags(pQueryInfo) && !pQuery->queryBlockDist) { + tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", addr, + (uint64_t)pQuery->numOfCols, numOfCols); + + return TSDB_CODE_TSC_INVALID_SQL; + } + + if (pQuery->interval.interval < 0) { + tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, addr, + (int64_t)pQueryInfo->interval.interval); + return TSDB_CODE_TSC_INVALID_SQL; + } + + if (pQuery->pGroupbyExpr->numOfGroupCols < 0) { + tscError("%p illegal value of numOfGroupCols in query msg: %d", addr, pQueryInfo->groupbyExpr.numOfGroupCols); + return TSDB_CODE_TSC_INVALID_SQL; + } + return TSDB_CODE_SUCCESS; } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 9504b44aa1..4967d28efe 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -209,7 +209,7 @@ typedef struct SQuery { int32_t srcRowSize; // todo extract struct int32_t resultRowSize; int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. - int32_t maxSrcColumnSize; + int32_t maxTableColumnWidth; int32_t tagLen; // tag value length of current query SSqlGroupbyExpr* pGroupbyExpr; SExprInfo* pExpr1; @@ -222,7 +222,6 @@ typedef struct SQuery { SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; - STableQueryInfo* current; void* tsdb; SMemRef memRef; STableGroupInfo tableGroupInfo; // table list SArray @@ -263,6 +262,7 @@ typedef struct SQueryRuntimeEnv { SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value + STableQueryInfo *current; SRspResultInfo resultInfo; SHashObj *pTableRetrieveTsMap; } SQueryRuntimeEnv; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cc98653722..f22b5eb06f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -275,37 +275,6 @@ static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { } } -//static bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { -// if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { -// return false; -// } -// -// for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { -// SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i); -// if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { -// //make sure the normal column locates at the second position if tbname exists in group by clause -// if (pGroupbyExpr->numOfGroupCols > 1) { -// assert(pColIndex->colIndex > 0); -// } -// -// return true; -// } -// } -// -// return false; -//} - -//static UNUSED_FUNC bool isStabledev(SQuery* pQuery) { -// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// int32_t functId = pQuery->pExpr1[i].base.functionId; -// if (functId == TSDB_FUNC_STDDEV_DST) { -// return true; -// } -// } -// -// return false; -//} - static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) { bool hasTags = false; int32_t numOfSelectivity = 0; @@ -336,49 +305,6 @@ static bool isProjQuery(SQuery *pQuery) { return true; } -//static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; } -//static UNUSED_FUNC bool isTopBottomQuery(SQuery *pQuery) { -// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// int32_t functionId = pQuery->pExpr1[i].base.functionId; -// if (functionId == TSDB_FUNC_TS) { -// continue; -// } -// -// if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { -// return true; -// } -// } -// -// return false; -//} -//static UNUSED_FUNC bool timeWindowInterpoRequired(SQuery *pQuery) { -// for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// int32_t functionId = pQuery->pExpr1[i].base.functionId; -// if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) { -// return true; -// } -// } -// -// return false; -//} -//static UNUSED_FUNC bool hasTagValOutput(SQuery* pQuery) { -// SExprInfo *pExprInfo = &pQuery->pExpr1[0]; -// if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { -// return true; -// } else { // set tag value, by which the results are aggregated. -// for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { -// SExprInfo *pLocalExprInfo = &pQuery->pExpr1[idx]; -// -// // ts_comp column required the tag value for join filter -// if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { -// return true; -// } -// } -// } -// -// return false; -//} - static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { return false; @@ -736,16 +662,16 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer } } -static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, +static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); + SQuery* pQuery = pRuntimeEnv->pQuery; + STableQueryInfo* item = pRuntimeEnv->current; int32_t num = -1; int32_t order = pQuery->order.order; int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); - STableQueryInfo* item = pQuery->current; - if (QUERY_IS_ASC_QUERY(pQuery)) { if (ekey < pDataBlockInfo->window.ekey) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); @@ -1246,7 +1172,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t forwardStep = 0; TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = - getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. int32_t curIndex = curTimeWindowIndex(pResultRowInfo); @@ -1305,7 +1231,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); @@ -1317,12 +1243,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); } - updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey); + updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pRuntimeEnv->current->lastKey); } static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->pQuery->current; + STableQueryInfo* item = pRuntimeEnv->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); int16_t bytes = pColInfoData->info.bytes; @@ -1366,7 +1292,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->pQuery->current; + STableQueryInfo* item = pRuntimeEnv->current; // primary timestamp column SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); @@ -1719,7 +1645,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); + pRuntimeEnv->keyBuf = malloc(pQuery->maxTableColumnWidth + sizeof(int64_t)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); pRuntimeEnv->tagVal = malloc(pQuery->tagLen); @@ -1797,12 +1723,14 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Arithmetic: { SOperatorInfo* prev = pRuntimeEnv->pTableScanner; - if (i >= 1) { + if (i == 0) { + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); + } else { prev = pRuntimeEnv->proot; + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr2, pQuery->numOfExpr2); } - pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQuery->pExpr1, pQuery->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); break; } @@ -2421,7 +2349,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf } // save the cursor status - pRuntimeEnv->pQuery->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); + pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); } else { for (int32_t i = 0; i < numOfRows; ++i) { bool qualified = false; @@ -2566,7 +2494,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pBlock->pBlockStatis = NULL; SQuery* pQuery = pRuntimeEnv->pQuery; - int64_t groupId = pQuery->current->groupIndex; + int64_t groupId = pRuntimeEnv->current->groupIndex; bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); SQInfo* pQInfo = pRuntimeEnv->qinfo; @@ -2582,8 +2510,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // compare tag first tVariant t = {0}; - doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); - setTimestampListJoinInfo(pRuntimeEnv, &t, pQuery->current); + doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); + setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current); STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) { @@ -2619,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa } else if (pQuery->stableQuery && (!pQuery->tsCompQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, - pQuery->current->groupIndex); + pRuntimeEnv->current->groupIndex); } (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); @@ -3281,7 +3209,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) { - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; @@ -3430,7 +3358,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx */ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo *pTableQueryInfo = pQuery->current; + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo; if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) { @@ -3662,7 +3590,7 @@ void queryCostStatis(SQInfo *pQInfo) { //static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { // SQuery *pQuery = pRuntimeEnv->pQuery; -// STableQueryInfo* pTableQueryInfo = pQuery->current; +// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // @@ -3693,7 +3621,7 @@ void queryCostStatis(SQInfo *pQInfo) { // int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); // // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, pRuntimeEnv->qinfo, -// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); +// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pRuntimeEnv->current->lastKey); //} //void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { @@ -3706,7 +3634,7 @@ void queryCostStatis(SQInfo *pQInfo) { // pQuery->pos = 0; // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // -// STableQueryInfo* pTableQueryInfo = pQuery->current; +// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; @@ -3772,7 +3700,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, // pRuntimeEnv->qinfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, -// pQuery->current->lastKey); +// pRuntimeEnv->current->lastKey); // // return key; // } else { // do nothing @@ -3789,9 +3717,9 @@ void queryCostStatis(SQInfo *pQInfo) { //static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { // SQuery *pQuery = pRuntimeEnv->pQuery; // if (QUERY_IS_ASC_QUERY(pQuery)) { -// assert(*start <= pQuery->current->lastKey); +// assert(*start <= pRuntimeEnv->current->lastKey); // } else { -// assert(*start >= pQuery->current->lastKey); +// assert(*start >= pRuntimeEnv->current->lastKey); // } // // // if queried with value filter, do NOT forward query start position @@ -3810,7 +3738,7 @@ void queryCostStatis(SQInfo *pQInfo) { // bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); // // SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; -// STableQueryInfo *pTableQueryInfo = pQuery->current; +// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { @@ -4137,8 +4065,9 @@ static SSDataBlock* doTableScanImpl(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = &pTableScanInfo->block; - SQuery* pQuery = pOperator->pRuntimeEnv->pQuery; + SSDataBlock* pBlock = &pTableScanInfo->block; + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { @@ -4150,14 +4079,14 @@ static SSDataBlock* doTableScanImpl(void* param) { tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); // todo opt - if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) { + if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { STableQueryInfo** pTableQueryInfo = (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); if (pTableQueryInfo == NULL) { break; } - pQuery->current = *pTableQueryInfo; + pRuntimeEnv->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); } @@ -4285,7 +4214,7 @@ static SSDataBlock* doBlockInfoScan(void* param) { tbufCloseWriter(&bw); SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0); - pOperator->pRuntimeEnv->pQuery->current = taosArrayGetP(g, 0); + pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); pOperator->status = OP_EXEC_DONE; return pBlock; @@ -4459,7 +4388,7 @@ static SSDataBlock* doAggregate(void* param) { break; } - setTagValue(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); if (upstream->operatorType == OP_DataBlocksOptScan) { STableScanInfo* pScanInfo = upstream->info; @@ -4512,7 +4441,7 @@ static SSDataBlock* doSTableAggregate(void* param) { break; } - setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); if (upstream->operatorType == OP_DataBlocksOptScan) { STableScanInfo* pScanInfo = upstream->info; @@ -4523,7 +4452,7 @@ static SSDataBlock* doSTableAggregate(void* param) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; - setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, key); + setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); } @@ -4562,7 +4491,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // todo dynamic set tags setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); @@ -4736,7 +4665,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { } // the pDataBlock are always the same one, no need to call this again - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); @@ -4843,7 +4772,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); - setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); if (pInfo->colIndex == -1) { pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); } @@ -6226,14 +6155,14 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr } pQuery->srcRowSize = 0; - pQuery->maxSrcColumnSize = 0; + pQuery->maxTableColumnWidth = 0; for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i].filterInfo = tFilterInfoDup(pQueryMsg->colList[i].filterInfo, pQuery->colList[i].numOfFilters); pQuery->srcRowSize += pQuery->colList[i].bytes; - if (pQuery->maxSrcColumnSize < pQuery->colList[i].bytes) { - pQuery->maxSrcColumnSize = pQuery->colList[i].bytes; + if (pQuery->maxTableColumnWidth < pQuery->colList[i].bytes) { + pQuery->maxTableColumnWidth = pQuery->colList[i].bytes; } } -- GitLab