diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index aa7784a8b3856f2caf9f17471474b1732e17cb3a..a937e68b72ce6f189012d44acebe748708f667de 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -234,6 +234,7 @@ typedef struct SQuery { void* tsdb; SMemRef memRef; + STableGroupInfo tableGroupInfo; // table list SArray } SQuery; typedef struct SQueryRuntimeEnv { @@ -285,7 +286,6 @@ typedef struct SQInfo { int64_t owner; // if it is in execution int32_t vgId; - STableGroupInfo tableGroupInfo; // table list SArray SQueryRuntimeEnv runtimeEnv; SQuery query; @@ -305,8 +305,7 @@ typedef struct SQInfo { void* rspContext; // response context int64_t startExecTs; // start to exec timestamp char* sql; // query sql string - SQueryCostInfo summary; - + SQueryCostInfo summary; } SQInfo; typedef struct SQueryParam { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3e00883cc22ae107b34284c90e0a6ca08a372347..79ddb857390f0948717f56bc8668e5072047c46e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2373,9 +2373,10 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } static void doExchangeTimeWindow(SQInfo* pQInfo, STimeWindow* win) { - size_t t = taosArrayGetSize(pQInfo->tableGroupInfo.pGroupList); + SQuery* pQuery = &pQInfo->query; + size_t t = taosArrayGetSize(pQuery->tableGroupInfo.pGroupList); for(int32_t i = 0; i < t; ++i) { - SArray* p1 = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); + SArray* p1 = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, i); size_t len = taosArrayGetSize(p1); for(int32_t j = 0; j < len; ++j) { @@ -3220,8 +3221,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR } } -void disableFuncInReverseScan(SQInfo *pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; +void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; @@ -3255,7 +3255,7 @@ static void setupQueryRangeForReverseScan(SQInfo* pQInfo) { for(int32_t i = 0; i < numOfGroups; ++i) { SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); - SArray *tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); + SArray *tableKeyGroup = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, i); size_t t = taosArrayGetSize(group); for (int32_t j = 0; j < t; ++j) { @@ -3496,7 +3496,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pQInfo); + disableFuncInReverseScan(pRuntimeEnv); setupQueryRangeForReverseScan(pQInfo); // clean unused handle @@ -3504,7 +3504,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -3533,7 +3533,7 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) { SET_REVERSE_SCAN_FLAG(pRuntimeEnv); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pQInfo); + disableFuncInReverseScan(pRuntimeEnv); setupQueryRangeForReverseScan(pQInfo); } @@ -3668,8 +3668,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { } STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + restoreTimeWindow(&pQuery->tableGroupInfo, &cond); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -4635,11 +4635,11 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); // update the query time window pQuery->window = cond.twindow; - if (pQInfo->tableGroupInfo.numOfTables == 0) { + if (pQuery->tableGroupInfo.numOfTables == 0) { pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; } else { size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -4656,9 +4656,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } } } else if (isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); } return terrno; @@ -4776,9 +4776,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts } // create runtime environment - int32_t numOfTables = pQInfo->tableGroupInfo.numOfTables; + int32_t numOfTables = pQuery->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQInfo->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId); + code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQuery->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5086,7 +5086,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pRuntimeEnv->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); + SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, pRuntimeEnv->groupIndex, numOfGroups, group); @@ -5145,7 +5145,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } else if (pQuery->groupbyColumn) { // group-by on normal columns query while (pRuntimeEnv->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); + SArray *group = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pRuntimeEnv->groupIndex, numOfGroups); @@ -5228,7 +5228,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { void *pQueryHandle = pRuntimeEnv->pQueryHandle; if (pQueryHandle == NULL) { STsdbQueryCond con = createTsdbQueryCond(pQuery, &pQuery->window); - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); pQueryHandle = pRuntimeEnv->pQueryHandle; } @@ -5494,11 +5494,11 @@ static int32_t doSaveContext(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pQInfo); + disableFuncInReverseScan(pRuntimeEnv); setupQueryRangeForReverseScan(pQInfo); pRuntimeEnv->prevGroupId = INT32_MIN; - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); return (pRuntimeEnv->pSecQueryHandle == NULL)? -1:0; } @@ -5700,6 +5700,7 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { static SSDataBlock* doTableScan(void* param) { STableScanInfo * pTableScanInfo = (STableScanInfo *)param; SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; while (pTableScanInfo->current < pTableScanInfo->times) { SSDataBlock* p = doScanTableImpl(pTableScanInfo); @@ -5713,16 +5714,16 @@ static SSDataBlock* doTableScan(void* param) { // do prepare for the next round table scan operation tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); - STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, - pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef); + tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, + pTableScanInfo->pQInfo, &pQuery->memRef); if (pTableScanInfo->pQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } pRuntimeEnv->resultRowInfo.curIndex = 0; - setQueryStatus(pRuntimeEnv->pQuery, QUERY_NOT_COMPLETED); + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; if (pRuntimeEnv->pTsBuf) { @@ -5739,9 +5740,9 @@ static SSDataBlock* doTableScan(void* param) { tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); - STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, + tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef); qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, @@ -5798,6 +5799,7 @@ static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) { // this is a blocking operator static SSDataBlock* doAggOperator(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; + SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; int32_t countId = 0; int32_t order = getScanOrder(pInfo->pTableScanInfo); @@ -5809,23 +5811,23 @@ static SSDataBlock* doAggOperator(void* param) { } if (countId != getTableScanTime(pInfo->pTableScanInfo)) { - needRepeatScan(pInfo->pRuntimeEnv); + needRepeatScan(pRuntimeEnv); countId = getTableScanTime(pInfo->pTableScanInfo); } if (order != getScanOrder(pInfo->pTableScanInfo)) { - setEnvBeforeReverseScan_rv(pInfo->pRuntimeEnv); + setEnvBeforeReverseScan_rv(pRuntimeEnv); order = getScanOrder(pInfo->pTableScanInfo); } - aggApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); + aggApplyFunctions(pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); } - setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult(pInfo->pRuntimeEnv); + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + finalizeQueryResult(pRuntimeEnv); - pInfo->pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv); - return pInfo->pRuntimeEnv->ouptputBuf; + pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pRuntimeEnv); + return pRuntimeEnv->ouptputBuf; } // todo set the attribute of query scan count @@ -5871,8 +5873,7 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo); // since the numOfRows must be identical for all functions that are allowed to be executed simutaneously. -// pQuery->rec.rows = getNumOfResult(pRuntimeEnv); - pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv); + pQuery->rec.rows = pResBlock->info.rows; // doSecondaryArithmeticProcess(pQuery); // TODO limit/offset refactor to be one operator @@ -6840,15 +6841,10 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - pQInfo->tableGroupInfo = *pTableGroupInfo; - - SQuery *pQuery = calloc(1, sizeof(SQuery)); - if (pQuery == NULL) { - goto _cleanup_query; - } - + SQuery* pQuery = &pQInfo->query; pQInfo->runtimeEnv.pQuery = pQuery; + pQuery->tableGroupInfo = *pTableGroupInfo; pQuery->numOfCols = numOfCols; pQuery->numOfOutput = numOfOutput; pQuery->limit.limit = pQueryMsg->limit; @@ -6972,7 +6968,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr int32_t index = 0; for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); + SArray* pa = taosArrayGetP(pQuery->tableGroupInfo.pGroupList, i); size_t s = taosArrayGetSize(pa); SArray* p1 = taosArrayInit(s, POINTER_BYTES); @@ -7016,7 +7012,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr _cleanup_qinfo: tsdbDestroyTableGroup(pTableGroupInfo); -_cleanup_query: if (pGroupbyExpr != NULL) { taosArrayDestroy(pGroupbyExpr->columnInfo); free(pGroupbyExpr); @@ -7208,8 +7203,6 @@ void freeQInfo(SQInfo *pQInfo) { taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); tfree(pQuery->pGroupbyExpr); } - - tfree(pQuery); } doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); @@ -7217,7 +7210,7 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo->pBuf); tfree(pQInfo->sql); - tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); + tsdbDestroyTableGroup(&pQuery->tableGroupInfo); taosHashCleanup(pQInfo->arrTableIdInfo); taosArrayDestroy(pQInfo->groupResInfo.pRows);