From 428bd3b742d4f1d946b9e7b551ca3afd123eaf31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Feb 2021 23:11:30 +0800 Subject: [PATCH] [td-225] refactor --- src/query/inc/qExecutor.h | 55 ++++++----- src/query/inc/qUtil.h | 2 +- src/query/src/qExecutor.c | 198 ++++++++++++++++++-------------------- 3 files changed, 127 insertions(+), 128 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f63ba8604a..ab26808fef 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -186,31 +186,45 @@ typedef struct SSDataBlock { } SSDataBlock; typedef struct SQuery { + SLimitVal limit; + + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool groupbyColumn; // denote if this is a groupby normal column query + bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo;// if the time window start/end required interpolation + bool queryWindowIdentical; // all query time windows are identical for all tables in one group + bool queryBlockDist; // if query data block distribution + bool stabledev; // super table stddev query + int32_t interBufSize; // intermediate buffer sizse + + SOrderVal order; + int16_t numOfCols; int16_t numOfTags; - SOrderVal order; + STimeWindow window; SInterval interval; int16_t precision; int16_t numOfOutput; int16_t fillType; int16_t checkResultBuf; // check if the buffer is full during scan each block - SLimitVal limit; int32_t srcRowSize; // todo extract struct int32_t resultRowSize; int32_t maxSrcColumnSize; int32_t tagLen; // tag value length of current query - SSqlGroupbyExpr* pGroupbyExpr; SExprInfo* pExpr1; SExprInfo* pExpr2; int32_t numOfExpr2; - SColumnInfo* colList; SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* fillVal; + SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. + SSingleColumnFilterInfo* pFilterInfo; + uint32_t status; // query status SResultRec rec; int32_t pos; @@ -218,33 +232,23 @@ typedef struct SQuery { STableQueryInfo* current; int32_t numOfCheckedBlocks; // number of check data blocks - SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. - SSingleColumnFilterInfo* pFilterInfo; - SSDataBlock *ouptputBuf; + void* tsdb; + SMemRef memRef; } SQuery; typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; + SQLFunctionCtx* pCtx; int32_t numOfRowsPerPage; uint16_t* offset; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; SResultRowInfo resultRowInfo; - - SQueryCostInfo summary; void* pQueryHandle; void* pSecQueryHandle; // another thread for - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo;// if the time window start/end required interpolation - bool queryWindowIdentical; // all query time windows are identical for all tables in one group - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - int32_t interBufSize; // intermediate buffer sizse + int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result @@ -262,6 +266,11 @@ typedef struct SQueryRuntimeEnv { SArithmeticSupport *sasArray; struct STableScanInfo* pi; + SSDataBlock *ouptputBuf; + + int32_t groupIndex; + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + } SQueryRuntimeEnv; enum { @@ -273,14 +282,14 @@ typedef struct SQInfo { void* signature; int32_t code; // error code to returned to client int64_t owner; // if it is in execution - void* tsdb; - SMemRef memRef; + int32_t vgId; STableGroupInfo tableGroupInfo; // table list SArray - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + SQueryRuntimeEnv runtimeEnv; + SQuery query; + SHashObj* arrTableIdInfo; - int32_t groupIndex; /* * the query is executed position on which meter of the whole list. @@ -296,6 +305,8 @@ typedef struct SQInfo { void* rspContext; // response context int64_t startExecTs; // start to exec timestamp char* sql; // query sql string + SQueryCostInfo summary; + } SQInfo; typedef struct SQueryParam { diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index d4a0c25886..3698de044f 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -57,7 +57,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3 SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); + int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage + pQuery->pExpr1[columnIndex].bytes * realRowId; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e12d1fba1a..af4e5fbbf3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -190,6 +190,8 @@ static SSDataBlock* createOutputBuf(SQuery* pQuery) { idata.pData = calloc(4096, idata.info.bytes); taosArrayPush(res->pDataBlock, &idata); } + + return res; } bool doFilterData(SQuery *pQuery, int32_t elemPos) { @@ -263,9 +265,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { /* * the value of number of result needs to be update due to offset value upated. */ -void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { - SQuery *pQuery = pRuntimeEnv->pQuery; - +void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, int32_t numOfRes) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); @@ -813,7 +813,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo return num; } -static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock); +static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock); static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, SArray *pDataBlock) { @@ -826,7 +826,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow pCtx[k].size = forwardStep; pCtx[k].startTs = pWin->skey; - char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, numOfTotal, pDataBlock); + char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, numOfTotal, pDataBlock); int32_t pos = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); if (dataBlock != NULL) { @@ -854,8 +854,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } -static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) { - SQuery *pQuery = pRuntimeEnv->pQuery; +static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, STimeWindow *pWin, int32_t offset) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { @@ -868,10 +867,8 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow * } } -static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo, +static int32_t getNextQualifiedWindow(SQuery* pQuery, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { - SQuery *pQuery = pRuntimeEnv->pQuery; - getNextTimeWindow(pQuery, pNext); // next time window is not in current block @@ -976,14 +973,12 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { } // todo refactor -static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { +static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { if (pDataBlock == NULL) { return NULL; } char *dataBlock = NULL; - SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t functionId = pQuery->pExpr1[col].base.functionId; if (functionId == TSDB_FUNC_ARITHM) { sas->pArithExpr = &pQuery->pExpr1[col]; @@ -1121,13 +1116,13 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray *pDataBlock, SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { - if (!pRuntimeEnv->timeWindowInterpo) { + SQuery* pQuery = pRuntimeEnv->pQuery; + if (!pQuery->timeWindowInterpo) { return; } assert(pDataBlock != NULL); - SQuery* pQuery = pRuntimeEnv->pQuery; int32_t fillType = pQuery->fillType; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1166,7 +1161,7 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc } } -static void aggApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, +static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -1178,15 +1173,10 @@ static void aggApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pSt } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); } - - /* - * the sqlfunctionCtx parameters should be set done before all functions are invoked, - * since the selectivity + tag_prj query needs all parameters been set done. - * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY - */ + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { @@ -1221,7 +1211,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); } @@ -1246,7 +1236,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // prev time window not interpolation yet. int32_t curIndex = curTimeWindowIndex(pWindowResInfo); - if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) { + if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) { for(int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. SResultRow *pRes = pWindowResInfo->pResult[j]; if (pRes->closed) { @@ -1278,7 +1268,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); + startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); if (startPos < 0) { break; } @@ -1313,7 +1303,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } _end: - if (pRuntimeEnv->timeWindowInterpo) { + if (pQuery->timeWindowInterpo) { int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex); } @@ -1572,7 +1562,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; - bool groupbyColumnValue = pRuntimeEnv->groupbyColumn; + bool groupbyColumnValue = pQuery->groupbyColumn; int16_t type = 0; int16_t bytes = 0; @@ -1584,7 +1574,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); pCtx[k].size = 1; } @@ -1641,7 +1631,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } // window start key interpolation - if (pRuntimeEnv->timeWindowInterpo) { + if (pQuery->timeWindowInterpo) { // check for the time window end time interpolation int32_t curIndex = curTimeWindowIndex(pWindowResInfo); if (prevWindowIndex != -1 && prevWindowIndex < curIndex) { @@ -1711,7 +1701,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } - if (pRuntimeEnv->stabledev) { + if (pQuery->stabledev) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionId != TSDB_FUNC_STDDEV_DST) { @@ -1767,7 +1757,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } // In case of all rows in current block are not qualified - if (pRuntimeEnv->timeWindowInterpo && prevRowIndex != -1) { + if (pQuery->timeWindowInterpo && prevRowIndex != -1) { saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, prevRowIndex); } @@ -1783,7 +1773,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl STableQueryInfo* pTableQueryInfo = pQuery->current; SResultRowInfo* pResultRowInfo = &pRuntimeEnv->resultRowInfo; - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyColumn) { + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); @@ -1795,9 +1785,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl // interval query with limit applied int32_t numOfRes = 0; - if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) { numOfRes = pResultRowInfo->size; - updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo); + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pQuery->timeWindowInterpo); } else { // projection query numOfRes = (int32_t) getNumOfResult(pRuntimeEnv); @@ -2054,7 +2044,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // if it is group by normal column, do not set output buffer, the output buffer is pResult // fixed output query/multi-output query for normal table - if (!pRuntimeEnv->groupbyColumn && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { + if (!pQuery->groupbyColumn && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { resetDefaultResInfoOutputBuf(pRuntimeEnv); } @@ -2189,7 +2179,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { } // Note:top/bottom query is fixed output query - if (pRuntimeEnv->topBotQuery || pRuntimeEnv->groupbyColumn) { + if (pRuntimeEnv->topBotQuery || pQuery->groupbyColumn) { return true; } @@ -2496,7 +2486,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { if (isGroupbyColumn(pQuery->pGroupbyExpr)) { num = 128; } else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { // time window query, allocate one page for each table - size_t s = pQInfo->tableqinfoGroupInfo.numOfTables; + size_t s = pRuntimeEnv->tableqinfoGroupInfo.numOfTables; num = (int32_t)(MAX(s, INITIAL_RESULT_ROWS_VALUE)); } else { // for super table query, one page for each subset num = 1; // pQInfo->pSidSet->numOfSubSet; @@ -2822,7 +2812,7 @@ static void expandBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t newSize, void* q static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; - if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTsCompQuery(pQuery)) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pQuery->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTsCompQuery(pQuery)) { SResultRec *pRec = &pQuery->rec; int32_t remain = (int32_t)(pRec->capacity - pRec->rows); @@ -3229,7 +3219,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { // group by normal columns and interval query on normal table SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { + if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor @@ -3417,7 +3407,7 @@ bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; bool toContinue = false; - if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { + if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; @@ -3532,22 +3522,10 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) { } SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pQInfo); setupQueryRangeForReverseScan(pQInfo); - - // clean unused handle - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } - - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); - if (pRuntimeEnv->pSecQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } } static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { @@ -3651,7 +3629,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start); SET_MASTER_SCAN_FLAG(pRuntimeEnv); - if (!pRuntimeEnv->groupbyColumn && pRuntimeEnv->hasTagResults) { + if (!pQuery->groupbyColumn && pRuntimeEnv->hasTagResults) { setTagVal(pRuntimeEnv, pTableQueryInfo->pTable); } @@ -3716,10 +3694,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { + if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - if (pRuntimeEnv->groupbyColumn) { + if (pQuery->groupbyColumn) { closeAllResultRows(pWindowResInfo); } @@ -3928,7 +3906,7 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->prevResult == NULL || pRuntimeEnv->groupbyColumn) { + if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) { return TSDB_CODE_SUCCESS; } @@ -4146,14 +4124,14 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc SResultRowInfo * pResultRowInfo = &pTableQueryInfo->resInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyColumn) { + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); } if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pRuntimeEnv->timeWindowInterpo); + updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pQuery->timeWindowInterpo); } } @@ -4188,7 +4166,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR return numOfTotal > 0; } else { // there are results waiting for returned to client. if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainData(pGroupResInfo) && - (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { + (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { return true; } } @@ -4304,7 +4282,7 @@ void queryCostStatis(SQInfo *pQInfo) { SQueryCostInfo *pSummary = &pRuntimeEnv->summary; uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); - hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map); + hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); pSummary->hashSize = hashSize; // add the merge time @@ -4475,7 +4453,7 @@ static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* w tw = *win; int32_t startPos = - getNextQualifiedWindow(pRuntimeEnv, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); + getNextQualifiedWindow(pQuery, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); assert(startPos >= 0); // set the abort info @@ -4595,7 +4573,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { } else { tw = win; int32_t startPos = - getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); + getNextQualifiedWindow(pQuery, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); assert(startPos >= 0); // set the abort info @@ -4635,7 +4613,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); if (!isSTableQuery - && (pQInfo->tableqinfoGroupInfo.numOfTables == 1) + && (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1) && (cond.order == TSDB_ORDER_ASC) && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isGroupbyColumn(pQuery->pGroupbyExpr)) @@ -4653,7 +4631,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) // update the query time window pQuery->window = cond.twindow; if (pQInfo->tableGroupInfo.numOfTables == 0) { - pQInfo->tableqinfoGroupInfo.numOfTables = 0; + pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; } else { size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); for(int32_t i = 0; i < numOfGroups; ++i) { @@ -4711,7 +4689,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); - pRuntimeEnv->timeWindowInterpo = timeWindowInterpoRequired(pQuery); + pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); pRuntimeEnv->prevResult = prevResult; setScanLimitationByResultBuffer(pQuery); @@ -4730,7 +4708,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; - pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); + pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); if (needReverseScan(pQuery)) { pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery), 1); @@ -4756,7 +4734,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts if (!QUERY_IS_INTERVAL_QUERY(pQuery)) { int16_t type = TSDB_DATA_TYPE_NULL; - if (pRuntimeEnv->groupbyColumn) { // group by columns not tags; + if (pQuery->groupbyColumn) { // group by columns not tags; type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); } else { type = TSDB_DATA_TYPE_INT; // group id @@ -4767,7 +4745,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts return code; } } - } else if (pRuntimeEnv->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { + } else if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { int32_t numOfResultRows = getInitialPageNum(pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); @@ -4776,7 +4754,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts } int16_t type = TSDB_DATA_TYPE_NULL; - if (pRuntimeEnv->groupbyColumn) { + if (pQuery->groupbyColumn) { type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); } else { type = TSDB_DATA_TYPE_TIMESTAMP; @@ -4877,7 +4855,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { } tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); - STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); + STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); if(pTableQueryInfo == NULL) { break; } @@ -4885,7 +4863,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { pQuery->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); - if (!pRuntimeEnv->groupbyColumn) { + if (!pQuery->groupbyColumn) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } @@ -5154,7 +5132,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } } - } else if (pRuntimeEnv->groupbyColumn) { // group-by on normal columns query + } else if (pQuery->groupbyColumn) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); @@ -5233,8 +5211,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { resetDefaultResInfoOutputBuf(pRuntimeEnv); SArray *group = GET_TABLEGROUP(pQInfo, 0); - assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && - 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); + assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && + 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); void *pQueryHandle = pRuntimeEnv->pQueryHandle; if (pQueryHandle == NULL) { @@ -5268,7 +5246,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); STableQueryInfo **pTableQueryInfo = - (STableQueryInfo **) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); + (STableQueryInfo **) taosHashGet(pRuntimeEnv->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); if (pTableQueryInfo == NULL) { break; } @@ -5380,7 +5358,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } // all data have returned already - if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { + if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { return; } @@ -5388,10 +5366,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); SArray *group = GET_TABLEGROUP(pQInfo, 0); - assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && - 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); + assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && + 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); - while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { + while (pQInfo->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5454,7 +5432,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { + if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_COMPLETED); } @@ -5479,7 +5457,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, + pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset); } } @@ -5680,7 +5658,7 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { tfree(arithSup.data); } -static SSDataBlock* doScanImpl(STableScanInfo *pTableScanInfo) { +static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { @@ -5710,7 +5688,7 @@ static SSDataBlock* doTableScan(void* param) { SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; while (pTableScanInfo->current < pTableScanInfo->times) { - SSDataBlock* p = doScanImpl(pTableScanInfo); + SSDataBlock* p = doScanTableImpl(pTableScanInfo); if (p != NULL) { return p; } @@ -5758,7 +5736,7 @@ static SSDataBlock* doTableScan(void* param) { pTableScanInfo->times = 1; pTableScanInfo->current = 0; - SSDataBlock* p = doScanImpl(pTableScanInfo); + SSDataBlock* p = doScanTableImpl(pTableScanInfo); if (p != NULL) { return p; } @@ -5799,11 +5777,17 @@ static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) { return pTableScanInfo->current; } +static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) { + return pTableScanInfo->order; +} + // this is a blocking operator static SSDataBlock* doAggOperator(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; int32_t countId = 0; + int32_t order = getScanOrder(pInfo->pTableScanInfo); + while(1) { SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); if (pBlock == NULL) { @@ -5812,16 +5796,22 @@ static SSDataBlock* doAggOperator(void* param) { if (countId != getTableScanTime(pInfo->pTableScanInfo)) { needRepeatScan(pInfo->pRuntimeEnv); + countId = getTableScanTime(pInfo->pTableScanInfo); + } + + if (order != getScanOrder(pInfo->pTableScanInfo)) { + setEnvBeforeReverseScan_rv(pInfo->pRuntimeEnv); + order = getScanOrder(pInfo->pTableScanInfo); } - aggApplyFunctions_rv(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); + aggApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); } - setQueryStatus(pQuery, QUERY_COMPLETED); + setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pInfo->pRuntimeEnv); - res->info.rows = getNumOfResult(pInfo->pRuntimeEnv); - return res; + pInfo->pRuntimeEnv->pQuery->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv); + return pInfo->pRuntimeEnv->pQuery->ouptputBuf; } // todo set the attribute of query scan count @@ -5866,8 +5856,6 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo); -// scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); - // since the numOfRows must be identical for all functions that are allowed to be executed simutaneously. // pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv); @@ -5973,7 +5961,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { TSKEY newStartKey = QUERY_IS_ASC_QUERY(pQuery)? INT64_MIN:INT64_MAX; // skip blocks without load the actual data block from file if no filter condition present - if (!pRuntimeEnv->groupbyColumn) { + if (!pQuery->groupbyColumn) { skipTimeInterval(pRuntimeEnv, &newStartKey); if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0 && pRuntimeEnv->pFillInfo == NULL) { setQueryStatus(pQuery, QUERY_COMPLETED); @@ -6049,14 +6037,14 @@ void tableQueryImpl(SQInfo *pQInfo) { pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); - assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); + assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); SArray* g = GET_TABLEGROUP(pQInfo, 0); STableQueryInfo* item = taosArrayGetP(g, 0); pQuery->current = item; // group by normal column, sliding window query, interval query are handled by interval query processor - if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { // interval (down sampling operation) + if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) { // interval (down sampling operation) tableIntervalProcess(pQInfo, item); } else if (isFixedOutputQuery(pRuntimeEnv)) { tableAggregationProcess(pQInfo, item); @@ -6067,7 +6055,7 @@ void tableQueryImpl(SQInfo *pQInfo) { // record the total elapsed time pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); - assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); + assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); } void buildTableBlockDistResult(SQInfo *pQInfo) { @@ -6131,10 +6119,10 @@ void stableQueryImpl(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); if (QUERY_IS_INTERVAL_QUERY(pQuery) || - (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) { + (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { multiTableQueryProcess(pQInfo); } else { - assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyColumn); + assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); sequentialTableProcess(pQInfo); } @@ -6938,7 +6926,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr size_t numOfGroups = 0; if (pTableGroupInfo->pGroupList != NULL) { numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); - STableGroupInfo* pTableqinfo = &pQInfo->tableqinfoGroupInfo; + STableGroupInfo* pTableqinfo = &pRuntimeEnv->tableqinfoGroupInfo; pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; @@ -6977,7 +6965,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr goto _cleanup; } - taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); + taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1); for(int32_t j = 0; j < s; ++j) { STableKeyInfo* info = taosArrayGet(pa, j); @@ -6997,7 +6985,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr taosArrayPush(p1, &item); STableId* id = TSDB_TABLEID(info->pTable); - taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); + taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); index += 1; } } @@ -7076,12 +7064,12 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); setQueryStatus(pQuery, QUERY_COMPLETED); - pQInfo->tableqinfoGroupInfo.numOfTables = 0; + pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; // todo free memory return TSDB_CODE_SUCCESS; } - if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { + if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; @@ -7161,7 +7149,7 @@ void freeQInfo(SQInfo *pQInfo) { qDebug("QInfo:%p start to free QInfo", pQInfo); - releaseQueryBuf(pQInfo->tableqinfoGroupInfo.numOfTables); + releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -7206,7 +7194,7 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQuery); } - doDestroyTableQueryInfo(&pQInfo->tableqinfoGroupInfo); + doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); tfree(pQInfo->pBuf); tfree(pQInfo->sql); @@ -7348,7 +7336,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { SArray* pa = GET_TABLEGROUP(pQInfo, 0); size_t num = taosArrayGetSize(pa); - assert(num == pQInfo->tableqinfoGroupInfo.numOfTables); + assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); int32_t count = 0; int32_t functionId = pQuery->pExpr1[0].base.functionId; -- GitLab