提交 aaeedc65 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 a260707c
...@@ -226,7 +226,6 @@ typedef struct SQuery { ...@@ -226,7 +226,6 @@ typedef struct SQuery {
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
uint32_t status; // query status
STableQueryInfo* current; STableQueryInfo* current;
void* tsdb; void* tsdb;
SMemRef memRef; SMemRef memRef;
...@@ -239,35 +238,32 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); ...@@ -239,35 +238,32 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo; struct SOperatorInfo;
typedef struct {
FILE* file; // file struct pointer
} SFileResultInfo;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env; jmp_buf env;
SQuery* pQuery; SQuery* pQuery;
void* qinfo; uint32_t status; // query status
uint16_t scanFlag; // denotes reversed scan of data or not void* qinfo;
SFillInfo* pFillInfo; // todo move to operatorInfo uint16_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle; SFillInfo* pFillInfo; // todo move to operatorInfo
void* pQueryHandle;
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t prevGroupId; // previous executed group id
SHashObj* pResultRowHashTable; // quick locate the window object for each result SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
char* keyBuf; // window key buffer SHashObj* pResultRowHashTable; // quick locate the window object for each result
SResultRowPool* pool; // window result object pool char* keyBuf; // window key buffer
char** prevRow; SResultRowPool* pool; // window result object pool
char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list SArray* prevResult; // intermediate result, SArray<SInterResult>
STSCursor cur; STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray; char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray;
SSDataBlock *outputBuf;
int32_t tableIndex; //TODO remove it SSDataBlock *outputBuf;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure int32_t tableIndex; //TODO remove it
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo *proot; struct SOperatorInfo *proot;
struct SOperatorInfo *pTableScanner; // table scan operator struct SOperatorInfo *pTableScanner; // table scan operator
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
...@@ -431,7 +427,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); ...@@ -431,7 +427,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
bool isQueryKilled(SQInfo *pQInfo); bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool doBuildResCheck(SQInfo* pQInfo); bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQuery *pQuery, int8_t status); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQuery* pQuery); bool onlyQueryTags(SQuery* pQuery);
void buildTagQueryResult(SQInfo *pQInfo); void buildTagQueryResult(SQInfo *pQInfo);
......
...@@ -157,8 +157,6 @@ static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe ...@@ -157,8 +157,6 @@ static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
...@@ -460,33 +458,6 @@ static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { ...@@ -460,33 +458,6 @@ static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) {
return true; return true;
} }
/**
* @param pQuery
* @param col
* @param pDataBlockInfo
* @param pStatis
* @param pColStatis
* @return
*/
static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) {
if (pStatis != NULL && TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
*pColStatis = &pStatis[pColIndex->colIndex];
assert((*pColStatis)->colId == pColIndex->colId);
} else {
*pColStatis = NULL;
}
if (TSDB_COL_IS_TAG(pColIndex->flag) || TSDB_COL_IS_UD_COL(pColIndex->flag) || pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return false;
}
if ((*pColStatis) != NULL && (*pColStatis)->numOfNull == 0) {
return false;
}
return true;
}
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) { int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false; bool existed = false;
...@@ -1583,47 +1554,6 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde ...@@ -1583,47 +1554,6 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
#endif #endif
} }
void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, SExprInfo* pExprInfo) {
// int32_t functionId = pExprInfo->base.functionId;
// int32_t colId = pExprInfo->base.colInfo.colId;
SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pStatis, &tpField);
if (tpField != NULL) {
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis = *tpField;
assert(pCtx->preAggVals.statis.numOfNull <= pBlockInfo->rows);
} else {
pCtx->preAggVals.isSet = false;
}
pCtx->preAggVals.dataBlockLoaded = (inputData != NULL);
// limit/offset query will affect this value
// pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
// set the start position in current block
// int32_t offset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1);
// if (inputData != NULL) {
// pCtx->pInput = (char*)inputData + offset * pCtx->inputBytes;
// }
//
// uint32_t status = aAggs[functionId].status;
// if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
// pCtx->ptsList = tsCol + offset;
// }
// set the statistics data for primary time stamp column
// if (functionId == TSDB_FUNC_SPREAD && colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// pCtx->preAggVals.isSet = true;
// pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
// }
}
// set the output buffer for the selectivity + tag query // set the output buffer for the selectivity + tag query
static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) { if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) {
...@@ -3107,8 +3037,12 @@ int32_t initResultRow(SResultRow *pResultRow) { ...@@ -3107,8 +3037,12 @@ int32_t initResultRow(SResultRow *pResultRow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid) {
SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset, int64_t uid) { SQLFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int32_t tid = 0; int32_t tid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid);
...@@ -3180,13 +3114,13 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { ...@@ -3180,13 +3114,13 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) {
} }
} }
void setQueryStatus(SQuery *pQuery, int8_t status) { void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) {
if (status == QUERY_NOT_COMPLETED) { if (status == QUERY_NOT_COMPLETED) {
pQuery->status = status; pRuntimeEnv->status = status;
} else { } else {
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS(pQuery, QUERY_NOT_COMPLETED); CLEAR_QUERY_STATUS(pRuntimeEnv, QUERY_NOT_COMPLETED);
pQuery->status |= status; pRuntimeEnv->status |= status;
} }
} }
...@@ -3204,7 +3138,7 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow ...@@ -3204,7 +3138,7 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
SWITCH_ORDER(pQuery->order.order); SWITCH_ORDER(pQuery->order.order);
SET_REVERSE_SCAN_FLAG(pRuntimeEnv); SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED);
switchCtxOrder(pCtx, numOfOutput); switchCtxOrder(pCtx, numOfOutput);
// disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput); // disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput);
...@@ -3719,7 +3653,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR ...@@ -3719,7 +3653,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) {
return false; return false;
} }
...@@ -3745,7 +3679,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR ...@@ -3745,7 +3679,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity);
return numOfTotal > 0; return numOfTotal > 0;
} else { // there are results waiting for returned to client. } else { // there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) &&
(pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) {
return true; return true;
} }
...@@ -3801,14 +3735,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3801,14 +3735,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
qDebug("QInfo:%p set %d subscribe info", pQInfo, total); qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
// Check if query is completed or not for stable query or normal table query respectively. // Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) {
if (pQInfo->query.stableQuery) { if (pQInfo->query.stableQuery) {
if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) { if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
} else { } else {
if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) { if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
} }
} }
...@@ -4365,7 +4299,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts ...@@ -4365,7 +4299,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->fillType, pColInfo, pQInfo); pQuery->fillType, pColInfo, pQInfo);
} }
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4517,7 +4451,7 @@ static SSDataBlock* doTableScan(void* param) { ...@@ -4517,7 +4451,7 @@ static SSDataBlock* doTableScan(void* param) {
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
if (pRuntimeEnv->pTsBuf) { if (pRuntimeEnv->pTsBuf) {
...@@ -4741,7 +4675,7 @@ static SSDataBlock* doAggregate(void* param) { ...@@ -4741,7 +4675,7 @@ static SSDataBlock* doAggregate(void* param) {
} }
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
...@@ -4829,7 +4763,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -4829,7 +4763,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
while(1) { while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
break; break;
} }
...@@ -4860,7 +4794,7 @@ static SSDataBlock* doLimit(void* param) { ...@@ -4860,7 +4794,7 @@ static SSDataBlock* doLimit(void* param) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -4870,7 +4804,7 @@ static SSDataBlock* doLimit(void* param) { ...@@ -4870,7 +4804,7 @@ static SSDataBlock* doLimit(void* param) {
pInfo->total = pInfo->limit; pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} else { } else {
pInfo->total += pBlock->info.rows; pInfo->total += pBlock->info.rows;
...@@ -4891,7 +4825,7 @@ static SSDataBlock* doOffset(void* param) { ...@@ -4891,7 +4825,7 @@ static SSDataBlock* doOffset(void* param) {
while (1) { while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -4959,7 +4893,7 @@ static SSDataBlock* doIntervalAgg(void* param) { ...@@ -4959,7 +4893,7 @@ static SSDataBlock* doIntervalAgg(void* param) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0);
...@@ -5014,7 +4948,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { ...@@ -5014,7 +4948,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
pQuery->order.order = order; // TODO : restore the order pQuery->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv); doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
...@@ -5063,7 +4997,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { ...@@ -5063,7 +4997,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
if (!pRuntimeEnv->pQuery->stableQuery) { if (!pRuntimeEnv->pQuery->stableQuery) {
finalizeQueryResult_rv(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); finalizeQueryResult_rv(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
...@@ -5107,7 +5041,7 @@ static SSDataBlock* doFill(void* param) { ...@@ -5107,7 +5041,7 @@ static SSDataBlock* doFill(void* param) {
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->pQuery->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey;
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey); taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey);
taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock);
...@@ -5156,8 +5090,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5156,8 +5090,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand(); pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed);
pInfo->binfo.rowCellInfoOffset, pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
...@@ -5239,8 +5172,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5239,8 +5172,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed);
pInfo->seed);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp"; pOperator->name = "ArithmeticOp";
...@@ -5595,7 +5527,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) { ...@@ -5595,7 +5527,7 @@ void buildTableBlockDistResult(SQInfo *pQInfo) {
freeTableBlockDist(pTableBlockDist); freeTableBlockDist(pTableBlockDist);
// pRuntimeEnv->resultInfo.rows = 1; // pRuntimeEnv->resultInfo.rows = 1;
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
} }
void stableQueryImpl(SQInfo *pQInfo) { void stableQueryImpl(SQInfo *pQInfo) {
...@@ -6611,7 +6543,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p ...@@ -6611,7 +6543,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order); pQuery->window.ekey, pQuery->order.order);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
// todo free memory // todo free memory
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -6619,7 +6551,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p ...@@ -6619,7 +6551,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -6789,8 +6721,8 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { ...@@ -6789,8 +6721,8 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
} }
// all data returned, set query over // all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
} else { } else {
doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data); doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data);
...@@ -6802,7 +6734,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { ...@@ -6802,7 +6734,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) { if (pQuery->limit.limit > 0 && pQuery->limit.limit == pRuntimeEnv->resultInfo.total) {
qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit); qDebug("QInfo:%p results limitation reached, limitation:%"PRId64, pQInfo, pQuery->limit.limit);
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -6973,7 +6905,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6973,7 +6905,7 @@ void buildTagQueryResult(SQInfo* pQInfo) {
} }
pRuntimeEnv->resultInfo.rows = count; pRuntimeEnv->resultInfo.rows = count;
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
#endif #endif
} }
......
...@@ -218,7 +218,7 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -218,7 +218,7 @@ bool qTableQuery(qinfo_t qinfo) {
if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED); setQueryStatus(&pQInfo->runtimeEnv, QUERY_COMPLETED);
return doBuildResCheck(pQInfo); return doBuildResCheck(pQInfo);
} }
...@@ -340,13 +340,13 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -340,13 +340,13 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data); doDumpQueryResult(pQInfo, (*pRsp)->data);
} else { } else {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pRuntimeEnv, QUERY_OVER);
} }
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->dataReady = QUERY_RESULT_NOT_READY;
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_OVER)) {
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle. // here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
*continueExec = false; *continueExec = false;
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
...@@ -390,8 +390,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) { ...@@ -390,8 +390,7 @@ int32_t qQueryCompleted(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER);
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
} }
void qDestroyQueryInfo(qinfo_t qHandle) { void qDestroyQueryInfo(qinfo_t qHandle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册