From aaeedc652de5dfa5efeebfa044bace75a1ecaba3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Mar 2021 16:19:44 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/inc/qExecutor.h | 56 +++++++-------- src/query/src/qExecutor.c | 140 ++++++++++---------------------------- src/query/src/queryMain.c | 9 ++- 3 files changed, 66 insertions(+), 139 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 9e8ab5518d..5f101e9d5f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -226,7 +226,6 @@ typedef struct SQuery { SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; - uint32_t status; // query status STableQueryInfo* current; void* tsdb; SMemRef memRef; @@ -239,35 +238,32 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; -typedef struct { - FILE* file; // file struct pointer -} SFileResultInfo; - typedef struct SQueryRuntimeEnv { - jmp_buf env; - SQuery* pQuery; - void* qinfo; - uint16_t scanFlag; // denotes reversed scan of data or not - SFillInfo* pFillInfo; // todo move to operatorInfo - void* pQueryHandle; - - int32_t prevGroupId; // previous executed group id - SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SResultRowPool* pool; // window result object pool - char** prevRow; - - SArray* prevResult; // intermediate result, SArray - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; - - char* tagVal; // tag value of current data block - SArithmeticSupport *sasArray; - - SSDataBlock *outputBuf; - int32_t tableIndex; //TODO remove it - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + jmp_buf env; + SQuery* pQuery; + uint32_t status; // query status + void* qinfo; + uint16_t scanFlag; // denotes reversed scan of data or not + SFillInfo* pFillInfo; // todo move to operatorInfo + void* pQueryHandle; + + int32_t prevGroupId; // previous executed group id + SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SResultRowPool* pool; // window result object pool + char** prevRow; + + SArray* prevResult; // intermediate result, SArray + STSBuf* pTsBuf; // timestamp filter list + STSCursor cur; + + char* tagVal; // tag value of current data block + SArithmeticSupport *sasArray; + + SSDataBlock *outputBuf; + int32_t tableIndex; //TODO remove it + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo *proot; struct SOperatorInfo *pTableScanner; // table scan operator SGroupResInfo groupResInfo; @@ -431,7 +427,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); bool doBuildResCheck(SQInfo* pQInfo); -void setQueryStatus(SQuery *pQuery, int8_t status); +void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); bool onlyQueryTags(SQuery* pQuery); void buildTagQueryResult(SQInfo *pQInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9abf2152fc..0db15d8c1c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -157,8 +157,6 @@ static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); -static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - SDataStatis *pStatis, SExprInfo* pExprInfo); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); @@ -460,33 +458,6 @@ static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { return true; } -/** - * @param pQuery - * @param col - * @param pDataBlockInfo - * @param pStatis - * @param pColStatis - * @return - */ -static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) { - if (pStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { - *pColStatis = &pStatis[pColIndex->colIndex]; - assert((*pColStatis)->colId == pColIndex->colId); - } else { - *pColStatis = NULL; - } - - if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return false; - } - - if ((*pColStatis) != NULL && (*pColStatis)->numOfNull == 0) { - return false; - } - - return true; -} - static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { bool existed = false; @@ -1583,47 +1554,6 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde #endif } -void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - SDataStatis *pStatis, SExprInfo* pExprInfo) { - -// int32_t functionId = pExprInfo->base.functionId; -// int32_t colId = pExprInfo->base.colInfo.colId; - - SDataStatis *tpField = NULL; - pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pStatis, &tpField); - - if (tpField != NULL) { - pCtx->preAggVals.isSet = true; - pCtx->preAggVals.statis = *tpField; - assert(pCtx->preAggVals.statis.numOfNull <= pBlockInfo->rows); - } else { - pCtx->preAggVals.isSet = false; - } - - pCtx->preAggVals.dataBlockLoaded = (inputData != NULL); - - // limit/offset query will affect this value -// pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; - - // set the start position in current block -// int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1); -// if (inputData != NULL) { -// pCtx->pInput = (char*)inputData + offset * pCtx->inputBytes; -// } -// -// uint32_t status = aAggs[functionId].status; -// if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { -// pCtx->ptsList = tsCol + offset; -// } - - // set the statistics data for primary time stamp column -// if (functionId == TSDB_FUNC_SPREAD && colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { -// pCtx->preAggVals.isSet = true; -// pCtx->preAggVals.statis.min = pBlockInfo->window.skey; -// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; -// } -} - // set the output buffer for the selectivity + tag query static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) { @@ -3107,8 +3037,12 @@ int32_t initResultRow(SResultRow *pResultRow) { return TSDB_CODE_SUCCESS; } -void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, - SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset, int64_t uid) { +void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid) { + SQLFunctionCtx* pCtx = pInfo->pCtx; + SSDataBlock* pDataBlock = pInfo->pRes; + int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; + SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; + int32_t tid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); @@ -3180,13 +3114,13 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { } } -void setQueryStatus(SQuery *pQuery, int8_t status) { +void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) { if (status == QUERY_NOT_COMPLETED) { - pQuery->status = status; + pRuntimeEnv->status = status; } else { // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first - CLEAR_QUERY_STATUS(pQuery, QUERY_NOT_COMPLETED); - pQuery->status |= status; + CLEAR_QUERY_STATUS(pRuntimeEnv, QUERY_NOT_COMPLETED); + pRuntimeEnv->status |= status; } } @@ -3204,7 +3138,7 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow SWITCH_ORDER(pQuery->order.order); SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); switchCtxOrder(pCtx, numOfOutput); // disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput); @@ -3719,7 +3653,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; - if (!Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { return false; } @@ -3745,7 +3679,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); return numOfTotal > 0; } else { // there are results waiting for returned to client. - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && + if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { return true; } @@ -3801,14 +3735,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data qDebug("QInfo:%p set %d subscribe info", pQInfo, total); // Check if query is completed or not for stable query or normal table query respectively. - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { if (pQInfo->query.stableQuery) { if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) { - setQueryStatus(pQuery, QUERY_OVER); + setQueryStatus(pRuntimeEnv, QUERY_OVER); } } else { if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) { - setQueryStatus(pQuery, QUERY_OVER); + setQueryStatus(pRuntimeEnv, QUERY_OVER); } } } @@ -4365,7 +4299,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->fillType, pColInfo, pQInfo); } - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -4517,7 +4451,7 @@ static SSDataBlock* doTableScan(void* param) { STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; if (pRuntimeEnv->pTsBuf) { @@ -4741,7 +4675,7 @@ static SSDataBlock* doAggregate(void* param) { } pOperator->status = OP_EXEC_DONE; - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); @@ -4829,7 +4763,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { while(1) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); break; } @@ -4860,7 +4794,7 @@ static SSDataBlock* doLimit(void* param) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; return NULL; } @@ -4870,7 +4804,7 @@ static SSDataBlock* doLimit(void* param) { pInfo->total = pInfo->limit; - setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; } else { pInfo->total += pBlock->info.rows; @@ -4891,7 +4825,7 @@ static SSDataBlock* doOffset(void* param) { while (1) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; return NULL; } @@ -4959,7 +4893,7 @@ static SSDataBlock* doIntervalAgg(void* param) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->resultRowInfo); - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); @@ -5014,7 +4948,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { pOperator->status = OP_RES_TO_RETURN; pQuery->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { @@ -5063,7 +4997,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); if (!pRuntimeEnv->pQuery->stableQuery) { finalizeQueryResult_rv(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); @@ -5107,7 +5041,7 @@ static SSDataBlock* doFill(void* param) { } else { pInfo->totalInputRows += pBlock->info.rows; - int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->pQuery->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; + int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey); taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); @@ -5156,8 +5090,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, - pInfo->binfo.rowCellInfoOffset, pInfo->seed); + setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -5239,8 +5172,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, - pInfo->seed); + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; @@ -5595,7 +5527,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { freeTableBlockDist(pTableBlockDist); // pRuntimeEnv->resultInfo.rows = 1; - setQueryStatus(pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); } void stableQueryImpl(SQInfo *pQInfo) { @@ -6611,7 +6543,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); - setQueryStatus(pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; // todo free memory return TSDB_CODE_SUCCESS; @@ -6619,7 +6551,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); - setQueryStatus(pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -6789,8 +6721,8 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } // all data returned, set query over - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - setQueryStatus(pQuery, QUERY_OVER); + if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { + setQueryStatus(pRuntimeEnv, QUERY_OVER); } } else { doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); @@ -6802,7 +6734,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) { qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit); - setQueryStatus(pQuery, QUERY_OVER); + setQueryStatus(pRuntimeEnv, QUERY_OVER); } return TSDB_CODE_SUCCESS; @@ -6973,7 +6905,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { } pRuntimeEnv->resultInfo.rows = count; - setQueryStatus(pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); #endif } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index d78cea741f..1c0b4ab070 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -218,7 +218,7 @@ bool qTableQuery(qinfo_t qinfo) { if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table exists for query, abort", pQInfo); - setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED); + setQueryStatus(&pQInfo->runtimeEnv, QUERY_COMPLETED); return doBuildResCheck(pQInfo); } @@ -340,13 +340,13 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data); } else { - setQueryStatus(pQuery, QUERY_OVER); + setQueryStatus(pRuntimeEnv, QUERY_OVER); } pQInfo->rspContext = NULL; pQInfo->dataReady = QUERY_RESULT_NOT_READY; - if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { + if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_OVER)) { // here current thread hold the refcount, so it is safe to free tsdbQueryHandle. *continueExec = false; (*pRsp)->completed = 1; // notify no more result to client @@ -390,8 +390,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); + return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER); } void qDestroyQueryInfo(qinfo_t qHandle) { -- GitLab