diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ab26808fef3d727136b39912cea5a44283471d82..aa7784a8b3856f2caf9f17471474b1732e17cb3a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -269,6 +269,7 @@ typedef struct SQueryRuntimeEnv { SSDataBlock *ouptputBuf; int32_t groupIndex; + int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure } SQueryRuntimeEnv; @@ -295,7 +296,6 @@ typedef struct SQInfo { * the query is executed position on which meter of the whole list. * when the index reaches the last one of the list, it means the query is completed. */ - int32_t tableIndex; SGroupResInfo groupResInfo; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index af4e5fbbf31e1a25d603a29118f29931699d0a11..3e00883cc22ae107b34284c90e0a6ca08a372347 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -265,7 +265,9 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { /* * the value of number of result needs to be update due to offset value upated. */ -void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, int32_t numOfRes) { +void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); @@ -854,7 +856,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } -static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQuery* pQuery, STimeWindow *pWin, int32_t offset) { +static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) { + SQuery* pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { @@ -1902,8 +1905,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf qDebug("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; - pRuntimeEnv->interBufSize = getOutputInterResultBufSize(pQuery); - pRuntimeEnv->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); + pQuery->interBufSize = getOutputInterResultBufSize(pQuery); pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); @@ -1972,7 +1974,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pCtx->order = pQuery->order.order; pCtx->functionId = pSqlFuncMsg->functionId; - pCtx->stableQuery = pRuntimeEnv->stableQuery; + pCtx->stableQuery = pQuery->stableQuery; pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; @@ -2044,7 +2046,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // if it is group by normal column, do not set output buffer, the output buffer is pResult // fixed output query/multi-output query for normal table - if (!pQuery->groupbyColumn && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { + if (!pQuery->groupbyColumn && !pQuery->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { resetDefaultResInfoOutputBuf(pRuntimeEnv); } @@ -2070,6 +2072,7 @@ _clean: static void doFreeQueryHandle(SQInfo* pQInfo) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); @@ -2077,7 +2080,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { pRuntimeEnv->pQueryHandle = NULL; pRuntimeEnv->pSecQueryHandle = NULL; - SMemRef* pMemRef = &pQInfo->memRef; + SMemRef* pMemRef = &pQuery->memRef; assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); } @@ -2179,7 +2182,7 @@ static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { } // Note:top/bottom query is fixed output query - if (pRuntimeEnv->topBotQuery || pQuery->groupbyColumn) { + if (pQuery->topBotQuery || pQuery->groupbyColumn) { return true; } @@ -2478,9 +2481,10 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo } static int32_t getInitialPageNum(SQInfo *pQInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pRuntimeEnv->pQuery; + int32_t INITIAL_RESULT_ROWS_VALUE = 16; - int32_t num = 0; if (isGroupbyColumn(pQuery->pGroupbyExpr)) { @@ -2500,7 +2504,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i SQuery* pQuery = pRuntimeEnv->pQuery; int32_t MIN_ROWS_PER_PAGE = 4; - *rowsize = (int32_t)(pQuery->resultRowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery)); + *rowsize = (int32_t)(pQuery->resultRowSize * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); int32_t overhead = sizeof(tFilePage); // one page contains at least two rows @@ -2518,7 +2522,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { SQuery* pQuery = pRuntimeEnv->pQuery; - if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pRuntimeEnv->topBotQuery))) { + if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) { return true; } @@ -2577,7 +2581,7 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat } } - if (pRuntimeEnv->topBotQuery) { + if (pQuery->topBotQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pExpr1[i].base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -2644,7 +2648,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW SQuery *pQuery = pRuntimeEnv->pQuery; int64_t groupId = pQuery->current->groupIndex; - SQueryCostInfo* pCost = &pRuntimeEnv->summary; + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQueryCostInfo* pCost = &pQInfo->summary; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) { *status = BLK_DATA_ALL_NEEDED; @@ -2855,7 +2860,9 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* pTableQueryInfo = pQuery->current; - SQueryCostInfo* summary = &pRuntimeEnv->summary; + + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQueryCostInfo* summary = &pQInfo->summary; qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey, @@ -2982,7 +2989,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SExprInfo *pExprInfo = &pQuery->pExpr1[0]; - if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pRuntimeEnv->stableQuery) { + if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) { assert(pExprInfo->base.numOfParams == 1); int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; @@ -3143,6 +3150,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType); void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo; while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { @@ -3160,7 +3168,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { if (!hasRemainData(pGroupResInfo)) { cleanupGroupResInfo(pGroupResInfo); if (!incNextGroup(pGroupResInfo)) { - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); } } @@ -3241,11 +3249,12 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { } static void setupQueryRangeForReverseScan(SQInfo* pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); + int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pQInfo, i); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); SArray *tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); size_t t = taosArrayGetSize(group); @@ -3495,7 +3504,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -3565,8 +3574,8 @@ static void handleInterpolationQuery(SQInfo* pQInfo) { return; } - SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW); - SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW); + SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_PREV_ROW); + SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_NEXT_ROW); if (prev == NULL || next == NULL) { return; } @@ -3629,7 +3638,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start); SET_MASTER_SCAN_FLAG(pRuntimeEnv); - if (!pQuery->groupbyColumn && pRuntimeEnv->hasTagResults) { + if (!pQuery->groupbyColumn && pQuery->hasTagResults) { setTagVal(pRuntimeEnv, pTableQueryInfo->pTable); } @@ -3660,7 +3669,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -4221,8 +4230,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - if (pQInfo->runtimeEnv.stableQuery) { - if (IS_STASBLE_QUERY_OVER(pQInfo)) { + if (pQInfo->query.stableQuery) { + if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) { setQueryStatus(pQuery, QUERY_OVER); } } else { @@ -4279,7 +4288,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryCostInfo *pSummary = &pRuntimeEnv->summary; + SQueryCostInfo *pSummary = &pQInfo->summary; uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); @@ -4619,23 +4628,23 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) && (!isGroupbyColumn(pQuery->pGroupbyExpr)) && (!isFixedOutputQuery(pRuntimeEnv)) ) { - SArray* pa = GET_TABLEGROUP(pQInfo, 0); + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0); cond.twindow = pCheckInfo->win; } terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); // update the query time window pQuery->window = cond.twindow; if (pQInfo->tableGroupInfo.numOfTables == 0) { pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; } else { - size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); + size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pQInfo, i); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); size_t t = taosArrayGetSize(group); for (int32_t j = 0; j < t; ++j) { @@ -4647,9 +4656,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) } } } else if (isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); } return terrno; @@ -4687,8 +4696,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); - pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); + pQuery->topBotQuery = isTopBottomQuery(pQuery); + pQuery->hasTagResults = hasTagValOutput(pQuery); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); pRuntimeEnv->prevResult = prevResult; @@ -4699,14 +4708,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts return code; } - pQInfo->tsdb = tsdb; + pQuery->tsdb = tsdb; pQInfo->vgId = vgId; - pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pQInfo):0; + pQInfo->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTsBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; - pRuntimeEnv->stableQuery = isSTableQuery; + pQuery->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); @@ -4767,6 +4776,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts } // create runtime environment + int32_t numOfTables = pQInfo->tableGroupInfo.numOfTables; + pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQInfo->tableGroupInfo.numOfTables, pQuery->order.order, pQInfo->vgId); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4806,7 +4817,7 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa SQuery* pQuery = pQInfo->runtimeEnv.pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { + if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { setTagVal(pRuntimeEnv, pTableQueryInfo->pTable); } @@ -4838,7 +4849,7 @@ static void doTableQueryInfoTimeWindowCheck(SQuery* pQuery, STableQueryInfo* pTa static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; - SQueryCostInfo* summary = &pRuntimeEnv->summary; + SQueryCostInfo* summary = &pQInfo->summary; int64_t st = taosGetTimestampMs(); @@ -4867,7 +4878,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } - if (pRuntimeEnv->stabledev) { + if (pQuery->stabledev) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) { setParamValue(pRuntimeEnv); @@ -4914,10 +4925,10 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SArray *group = GET_TABLEGROUP(pQInfo, 0); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); STableQueryInfo* pCheckInfo = taosArrayGetP(group, index); - if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { + if (pQuery->hasTagResults || pRuntimeEnv->pTsBuf != NULL) { setTagVal(pRuntimeEnv, pCheckInfo->pTable); } @@ -4949,7 +4960,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { pRuntimeEnv->pQueryHandle = NULL; } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); taosArrayDestroy(tx); taosArrayDestroy(g1); if (pRuntimeEnv->pQueryHandle == NULL) { @@ -5068,17 +5079,17 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_COMPLETED); - size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); + size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); if (isPointInterpoQuery(pQuery)) { resetDefaultResInfoOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); - while (pQInfo->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); + while (pRuntimeEnv->groupIndex < numOfGroups) { + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, - pQInfo->groupIndex, numOfGroups, group); + pRuntimeEnv->groupIndex, numOfGroups, group); STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); SArray *g1 = taosArrayInit(1, POINTER_BYTES); @@ -5093,7 +5104,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pRuntimeEnv->pQueryHandle = NULL; } - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); taosArrayDestroy(tx); taosArrayDestroy(g1); @@ -5110,7 +5121,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { taosArrayDestroy(s); // here we simply set the first table as current table - SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); + SArray *first = GET_TABLEGROUP(pRuntimeEnv, pRuntimeEnv->groupIndex); pQuery->current = taosArrayGetP(first, 0); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); @@ -5122,7 +5133,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } skipResults(pRuntimeEnv); - pQInfo->groupIndex += 1; + pRuntimeEnv->groupIndex += 1; // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); @@ -5133,10 +5144,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } } else if (pQuery->groupbyColumn) { // group-by on normal columns query - while (pQInfo->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); + while (pRuntimeEnv->groupIndex < numOfGroups) { + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pRuntimeEnv->groupIndex); - qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, + qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pRuntimeEnv->groupIndex, numOfGroups); STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); @@ -5154,7 +5165,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } // no need to update the lastkey for each table - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &gp, pQInfo, &pQuery->memRef); taosArrayDestroy(g1); taosArrayDestroy(tx); @@ -5169,7 +5180,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // here we simply set the first table as current table scanMultiTableDataBlocks(pQInfo); - pQInfo->groupIndex += 1; + pRuntimeEnv->groupIndex += 1; taosArrayDestroy(s); @@ -5190,7 +5201,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, - pQInfo->groupIndex); + pRuntimeEnv->groupIndex); pQuery->rec.rows = 0; if (pWindowResInfo->size > pQuery->rec.capacity) { @@ -5205,19 +5216,19 @@ static void sequentialTableProcess(SQInfo *pQInfo) { cleanupGroupResInfo(&pQInfo->groupResInfo); break; } - } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) { + } else if (pQuery->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTsCompQuery(pQuery)) { //super table projection query with identical query time range for all tables. SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; resetDefaultResInfoOutputBuf(pRuntimeEnv); - SArray *group = GET_TABLEGROUP(pQInfo, 0); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); void *pQueryHandle = pRuntimeEnv->pQueryHandle; if (pQueryHandle == NULL) { STsdbQueryCond con = createTsdbQueryCond(pQuery, &pQuery->window); - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQuery->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); pQueryHandle = pRuntimeEnv->pQueryHandle; } @@ -5236,7 +5247,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { bool hasMoreBlock = true; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - SQueryCostInfo *summary = &pRuntimeEnv->summary; + SQueryCostInfo *summary = &pQInfo->summary; while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { summary->totalBlocks += 1; @@ -5254,7 +5265,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); - if (pRuntimeEnv->hasTagResults) { + if (pQuery->hasTagResults) { setTagVal(pRuntimeEnv, pQuery->current->pTable); } @@ -5326,7 +5337,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // the limitation of output result is reached, set the query completed skipResults(pRuntimeEnv); if (limitOperator(pQuery, pQInfo)) { - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); break; } } @@ -5339,7 +5350,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (!hasMoreBlock) { setQueryStatus(pQuery, QUERY_COMPLETED); - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); } } else { /* @@ -5358,32 +5369,32 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } // all data have returned already - if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { + if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { return; } resetDefaultResInfoOutputBuf(pRuntimeEnv); resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - SArray *group = GET_TABLEGROUP(pQInfo, 0); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, 0); assert(taosArrayGetSize(group) == pRuntimeEnv->tableqinfoGroupInfo.numOfTables && 1 == taosArrayGetSize(pRuntimeEnv->tableqinfoGroupInfo.pGroupList)); - while (pQInfo->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { + while (pRuntimeEnv->tableIndex < pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { if (isQueryKilled(pQInfo)) { longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - pQuery->current = taosArrayGetP(group, pQInfo->tableIndex); - if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { - pQInfo->tableIndex++; + pQuery->current = taosArrayGetP(group, pRuntimeEnv->tableIndex); + if (!multiTableMultioutputHelper(pQInfo, pRuntimeEnv->tableIndex)) { + pRuntimeEnv->tableIndex++; continue; } // TODO handle the limit offset problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - pQInfo->tableIndex++; + pRuntimeEnv->tableIndex++; continue; } } @@ -5393,7 +5404,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // the limitation of output result is reached, set the query completed if (limitOperator(pQuery, pQInfo)) { - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); break; } @@ -5407,7 +5418,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * not the consecutive query on meter on which is aborted due to buffer limitation * to ensure that, we can reset the query range once query on a meter is completed. */ - pQInfo->tableIndex++; + pRuntimeEnv->tableIndex++; updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); // if the buffer is full or group by each table, we need to jump out of the loop @@ -5432,7 +5443,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - if (pQInfo->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { + if (pRuntimeEnv->tableIndex >= pRuntimeEnv->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_COMPLETED); } @@ -5457,7 +5468,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, + pQInfo, (uint64_t)pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset); } } @@ -5487,7 +5498,7 @@ static int32_t doSaveContext(SQInfo *pQInfo) { setupQueryRangeForReverseScan(pQInfo); pRuntimeEnv->prevGroupId = INT32_MIN; - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQuery->memRef); return (pRuntimeEnv->pSecQueryHandle == NULL)? -1:0; } @@ -5508,11 +5519,12 @@ static void doRestoreContext(SQInfo *pQInfo) { static void doCloseAllTimeWindow(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); for (int32_t i = 0; i < numOfGroup; ++i) { - SArray *group = GET_TABLEGROUP(pQInfo, i); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { @@ -5681,6 +5693,8 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { return pBlock; } + + return NULL; } static SSDataBlock* doTableScan(void* param) { @@ -5701,8 +5715,8 @@ static SSDataBlock* doTableScan(void* param) { tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, - pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); + tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, + pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef); if (pTableScanInfo->pQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -5727,8 +5741,8 @@ static SSDataBlock* doTableScan(void* param) { STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, - pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); + tsdbQueryTables(pTableScanInfo->pQInfo->query.tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, + pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef); qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); @@ -5810,8 +5824,8 @@ static SSDataBlock* doAggOperator(void* param) { setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pInfo->pRuntimeEnv); - pInfo->pRuntimeEnv->pQuery->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv); - return pInfo->pRuntimeEnv->pQuery->ouptputBuf; + pInfo->pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pInfo->pRuntimeEnv); + return pInfo->pRuntimeEnv->ouptputBuf; } // todo set the attribute of query scan count @@ -5849,7 +5863,7 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (!pRuntimeEnv->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. + if (!pQuery->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. return; } @@ -6038,7 +6052,7 @@ void tableQueryImpl(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); - SArray* g = GET_TABLEGROUP(pQInfo, 0); + SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0); STableQueryInfo* item = taosArrayGetP(g, 0); pQuery->current = item; @@ -6054,7 +6068,7 @@ void tableQueryImpl(SQInfo *pQInfo) { } // record the total elapsed time - pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); + pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); } @@ -6127,7 +6141,7 @@ void stableQueryImpl(SQInfo *pQInfo) { } // record the total elapsed time - pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st); + pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); } static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { @@ -6881,7 +6895,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } doUpdateExprColumnIndex(pQuery); - pQuery->ouptputBuf = createOutputBuf(pQuery); + pQInfo->runtimeEnv.ouptputBuf = createOutputBuf(pQuery); int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { @@ -6926,7 +6940,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr size_t numOfGroups = 0; if (pTableGroupInfo->pGroupList != NULL) { numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); - STableGroupInfo* pTableqinfo = &pRuntimeEnv->tableqinfoGroupInfo; + STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo; pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; @@ -6950,7 +6964,8 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr pQuery->window = pQueryMsg->window; changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); - pQInfo->runtimeEnv.queryWindowIdentical = true; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + pQuery->queryWindowIdentical = true; bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr); STimeWindow window = pQuery->window; @@ -6972,7 +6987,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr window.skey = info->lastKey; if (info->lastKey != pQuery->window.skey) { - pQInfo->runtimeEnv.queryWindowIdentical = false; + pQInfo->query.queryWindowIdentical = false; } void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); @@ -6993,7 +7008,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr colIdCheck(pQuery); // todo refactor - pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); + pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; @@ -7039,6 +7054,8 @@ bool isValidQInfo(void *param) { int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable) { int32_t code = TSDB_CODE_SUCCESS; + + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; STSBuf *pTsBuf = NULL; @@ -7149,6 +7166,7 @@ void freeQInfo(SQInfo *pQInfo) { qDebug("QInfo:%p start to free QInfo", pQInfo); + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); @@ -7326,14 +7344,14 @@ void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); if (numOfGroup == 0) { return; } - SArray* pa = GET_TABLEGROUP(pQInfo, 0); + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); size_t num = taosArrayGetSize(pa); assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); @@ -7358,8 +7376,8 @@ void buildTagQueryResult(SQInfo* pQInfo) { } } - while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { - int32_t i = pQInfo->tableIndex++; + while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) { + int32_t i = pRuntimeEnv->tableIndex++; STableQueryInfo *item = taosArrayGetP(pa, i); char *output = pQuery->sdata[0]->data + count * rsize; @@ -7397,7 +7415,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { *(int64_t*) pQuery->sdata[0]->data = num; count = 1; - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pQInfo, count); } else { // return only the tags|table name etc. count = 0; @@ -7408,8 +7426,8 @@ void buildTagQueryResult(SQInfo* pQInfo) { maxNumOfTables = (int32_t)pQuery->limit.limit; } - while(pQInfo->tableIndex < num && count < maxNumOfTables) { - int32_t i = pQInfo->tableIndex++; + while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { + int32_t i = pRuntimeEnv->tableIndex++; // discard current result due to offset if (pQuery->limit.offset > 0) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 4cb61d1f4827388869a76016ed2f99a5aec10154..e8f6f9133bdc25be69ac283c65a73059474b3cf6 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -164,7 +164,8 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo } size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { - return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); + SQuery* pQuery = pRuntimeEnv->pQuery; + return (pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pQuery->interBufSize + sizeof(SResultRow); } SResultRowPool* initResultRowPool(size_t size) { @@ -540,11 +541,12 @@ static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupRes int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - SArray *group = GET_TABLEGROUP(pQInfo, pGroupResInfo->currentGroup); + SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); - int32_t ret = mergeIntoGroupResultImpl(&pQInfo->runtimeEnv, pGroupResInfo, group, pQInfo); + int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, pQInfo); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -560,13 +562,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo) { } if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) { - SET_STABLE_QUERY_OVER(pQInfo); + SET_STABLE_QUERY_OVER(pRuntimeEnv); } int64_t elapsedTime = taosGetTimestampUs() - st; qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo, pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); - pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime; + pQInfo->summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 1eca162870b4bbd2d585a04b4e61e7b23b47b403..4ba1c940974eb6ec7e474d852109794b12651bf9 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -216,7 +216,7 @@ bool qTableQuery(qinfo_t qinfo) { return doBuildResCheck(pQInfo); } - if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { + if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table exists for query, abort", pQInfo); setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED); return doBuildResCheck(pQInfo); @@ -236,9 +236,9 @@ bool qTableQuery(qinfo_t qinfo) { if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { assert(pQInfo->runtimeEnv.pQueryHandle == NULL); buildTagQueryResult(pQInfo); - } else if (pQInfo->runtimeEnv.stableQuery) { + } else if (pQInfo->query.stableQuery) { stableQueryImpl(pQInfo); - } else if (pQInfo->runtimeEnv.queryBlockDist){ + } else if (pQInfo->query.queryBlockDist){ buildTableBlockDistResult(pQInfo); } else { tableQueryImpl(pQInfo); @@ -248,7 +248,7 @@ bool qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p query is killed", pQInfo); } else if (pQuery->rec.rows == 0) { - qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); + qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); } else { qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); @@ -309,7 +309,6 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return TSDB_CODE_QRY_INVALID_QHANDLE; } - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; size_t size = getResultSize(pQInfo, &pQuery->rec.rows); @@ -328,10 +327,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co if (pQInfo->code == TSDB_CODE_SUCCESS) { (*pRsp)->offset = htobe64(pQuery->limit.offset); - (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); + (*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime); } else { (*pRsp)->offset = 0; - (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); + (*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime); } (*pRsp)->precision = htons(pQuery->precision);