diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d48d7d5ea104d98ee0877d9a411d6919023e4f3a..37fe302d43a20dff401c96f5a9861b5c01db64cd 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2185,43 +2185,43 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock) { +int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { SQuery *pQuery = pRuntimeEnv->pQuery; - uint32_t status = 0; + *status = 0; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { - status = BLK_DATA_ALL_NEEDED; + *status = BLK_DATA_ALL_NEEDED; } else { // check if this data block is required to load // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { - status = BLK_DATA_ALL_NEEDED; + *status = BLK_DATA_ALL_NEEDED; } - if (status != BLK_DATA_ALL_NEEDED) { + if ((*status) != BLK_DATA_ALL_NEEDED) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; - status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); - if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); + if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; } } } } - if (status == BLK_DATA_NO_NEEDED) { + if ((*status) == BLK_DATA_NO_NEEDED) { qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pRuntimeEnv->summary.discardBlocks += 1; - } else if (status == BLK_DATA_STATIS_NEEDED) { - if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { - // return DISK_DATA_LOAD_FAILED; - } + } else if ((*status) == BLK_DATA_STATIS_NEEDED) { + + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); pRuntimeEnv->summary.loadBlockStatis += 1; @@ -2230,24 +2230,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; } } else { - assert(status == BLK_DATA_ALL_NEEDED); + assert((*status) == BLK_DATA_ALL_NEEDED); // load the data block statistics to perform further filter pRuntimeEnv->summary.loadBlockStatis += 1; - if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { - } + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { // current block has been discard due to filter applied pRuntimeEnv->summary.discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - return BLK_DATA_DISCARD; + (*status) = BLK_DATA_DISCARD; } pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; pRuntimeEnv->summary.loadBlocks += 1; *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + if (*pDataBlock == NULL) { + return terrno; + } } return TSDB_CODE_SUCCESS; @@ -2431,17 +2433,20 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ensureOutputBuffer(pRuntimeEnv, &blockInfo); SDataStatis *pStatis = NULL; - SArray *pDataBlock = NULL; - if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { - pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step; - continue; - } + SArray * pDataBlock = NULL; + uint32_t status = 0; - if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query - longjmp(pRuntimeEnv->env, terrno); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + if (ret != TSDB_CODE_SUCCESS) { break; } + if (status == BLK_DATA_DISCARD) { + pQuery->current->lastKey = + QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + continue; + } + // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); @@ -4651,9 +4656,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { } SDataStatis *pStatis = NULL; - SArray *pDataBlock = NULL; - if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { - pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step; + SArray * pDataBlock = NULL; + uint32_t status = 0; + + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + if (ret != TSDB_CODE_SUCCESS) { + break; + } + + if (status == BLK_DATA_DISCARD) { + pQuery->current->lastKey = + QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; continue; }