diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index e958a53e87e9b687af920c8e896bef89a131c40d..c1208dbc0b41ce0ac4d24f16b636f72c5419b444 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -765,8 +765,8 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus } static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin, - SWindowResInfo *pWindowResInfo, SDataBlockInfo *pDataBlockInfo, - TSKEY *primaryKeys, __block_search_fn_t searchFn) { + SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys, + __block_search_fn_t searchFn) { SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { @@ -945,8 +945,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati STimeWindow nextWin = win; while (1) { - int32_t startPos = - getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pDataBlockInfo, primaryKeyCol, searchFn); + int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, primaryKeyCol, searchFn); if (startPos < 0) { break; } @@ -1357,10 +1356,10 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY } else { pCtx->preAggVals.isSet = false; } - - pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery)? pQuery->pos : 0; - pCtx->size = QUERY_IS_ASC_QUERY(pQuery)? size - pQuery->pos : pQuery->pos + 1; - + + pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos : 0; + pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? size - pQuery->pos : pQuery->pos + 1; + uint32_t status = aAggs[functionId].nStatus; if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { pCtx->ptsList = &tsCol[pCtx->startOffset]; @@ -1370,7 +1369,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY // last_dist or first_dist function // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null -// pCtx->ptsList = tsCol; + // pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* @@ -1386,12 +1385,12 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY pTWAInfo->EKey = pQuery->window.ekey; } -// pCtx->ptsList = tsCol; + // pCtx->ptsList = tsCol; } else if (functionId == TSDB_FUNC_ARITHM) { pCtx->param[1].pz = param; } - + #if defined(_DEBUG_VIEW) // int64_t *tsList = (int64_t *)primaryColumnData; // int64_t s = tsList[0]; @@ -2496,8 +2495,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pWindowResInfo->prevSKey = w.skey; } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - TSKEY start = blockInfo.window.ekey - pQuery->intervalTime; - getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); + getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &skey1, + &ekey1, &w); pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; @@ -2531,7 +2530,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataStatis *pStatis = NULL; SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); - + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &pRuntimeEnv->windowResInfo, pDataBlock); @@ -4059,13 +4058,13 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - + if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed - pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; + pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; pQuery->limit.offset = 0; return; } - + if (QUERY_IS_ASC_QUERY(pQuery)) { pQuery->pos = pQuery->limit.offset; } else { @@ -4074,7 +4073,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->rows - 1); - SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); + SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // update the pQuery->limit.offset value, and pQuery->pos value @@ -4083,10 +4082,10 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc // update the offset value pQuery->lastKey = keys[pQuery->pos]; pQuery->limit.offset = 0; - + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, - &pRuntimeEnv->windowResInfo, pDataBlock); - + &pRuntimeEnv->windowResInfo, pDataBlock); + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); } @@ -4102,7 +4101,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; - + while (tsdbNextDataBlock(pQueryHandle)) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return; @@ -4114,147 +4113,117 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= blockInfo.rows; pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; pQuery->lastKey += step; - - qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, pQuery->limit.offset); - } else { // find the appropriated start position in current block + + qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, + pQuery->limit.offset); + } else { // find the appropriated start position in current block updateOffsetVal(pRuntimeEnv, &blockInfo); break; } } } -static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; +static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; // if queried with value filter, do NOT forward query start position - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { + if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { return true; } - if (pQuery->limit.offset > 0 && (!isTopBottomQuery(pQuery)) && pQuery->interpoType == TSDB_INTERPO_NONE) { - /* - * 1. for top/bottom query, the offset applies to the final result, not here - * 2. for interval without interpolation query we forward pQuery->intervalTime at a time for - * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is - * not valid. otherwise, we only forward pQuery->limit.offset number of points - */ - if (isIntervalQuery(pQuery)) { - assert(pRuntimeEnv->windowResInfo.prevSKey == 0); + /* + * 1. for interval without interpolation query we forward pQuery->intervalTime at a time for + * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is + * not valid. otherwise, we only forward pQuery->limit.offset number of points + */ + assert(pRuntimeEnv->windowResInfo.prevSKey == 0); - TSKEY skey1, ekey1; - STimeWindow w = {0}; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + TSKEY skey1, ekey1; + STimeWindow w = {0}; + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { - // todo handle no data situation - } + while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { + SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); - SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); + if (QUERY_IS_ASC_QUERY(pQuery) && pWindowResInfo->prevSKey == 0) { + getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, + &w); + pWindowResInfo->startTime = w.skey; + pWindowResInfo->prevSKey = w.skey; + } else { + // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp + getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, + &w); - if (QUERY_IS_ASC_QUERY(pQuery)) { - getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, - &ekey1, &w); - pWindowResInfo->startTime = w.skey; - pWindowResInfo->prevSKey = w.skey; - } else { - // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - TSKEY start = blockInfo.window.ekey - pQuery->intervalTime; - getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); + pWindowResInfo->startTime = pQuery->window.skey; + pWindowResInfo->prevSKey = w.skey; + } - pWindowResInfo->startTime = pQuery->window.skey; - pWindowResInfo->prevSKey = w.skey; - } + // the first time window + STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery); - // the first time window - STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery); + while (pQuery->limit.offset > 0) { + if ((win.ekey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (win.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pQuery->limit.offset -= 1; + pWindowResInfo->prevSKey = win.skey; + } - while (pQuery->limit.offset > 0) { - STimeWindow tw = win; - getNextTimeWindow(pQuery, &tw); + STimeWindow tw = win; + getNextTimeWindow(pQuery, &tw); - // next time window starts from current data block + if (pQuery->limit.offset == 0) { if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { - // query completed - if ((tw.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (tw.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - setQueryStatus(pQuery, QUERY_COMPLETED); - break; - } - - tw = win; + // load the data block SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); - int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &tw, pWindowResInfo, &blockInfo, pColInfoData->pData, - binarySearchForKey); + tw = win; + int32_t startPos = + getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey); assert(startPos >= 0); - pQuery->limit.offset -= 1; // set the abort info pQuery->pos = startPos; pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; pWindowResInfo->prevSKey = tw.skey; - win = tw; - continue; - } else { - if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { - setQueryStatus(pQuery, QUERY_COMPLETED); - break; - } - - blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); - if ((blockInfo.window.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (blockInfo.window.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - setQueryStatus(pQuery, QUERY_COMPLETED); - break; - } - - // set the window that start from the next data block - TSKEY key = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.skey : blockInfo.window.ekey; - STimeWindow n = getActiveTimeWindow(pWindowResInfo, key, pQuery); - - // next data block are still covered by current time window - if (n.skey == win.skey && n.ekey == win.ekey) { - // do nothing - } else { - pQuery->limit.offset -= 1; - - // query completed - if ((n.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (n.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - setQueryStatus(pQuery, QUERY_COMPLETED); - break; - } - // set the abort info - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; - pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.skey : blockInfo.window.ekey; - pWindowResInfo->prevSKey = n.skey; + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, + &pRuntimeEnv->windowResInfo, pDataBlock); - win = n; - } + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", + GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); + return true; + } else { + // do nothing, + return true; } } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) || pQuery->limit.offset > 0) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return false; + // next time window starts from current data block + if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + // load the data block, note that only the primary timestamp column is required + SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); + SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); + + tw = win; + int32_t startPos = + getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey); + assert(startPos >= 0); + + // set the abort info + pQuery->pos = startPos; + pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; + pWindowResInfo->prevSKey = tw.skey; + win = tw; } else { - assert(0); - // if (IS_DISK_DATA_BLOCK(pQuery)) { - // getTimestampInDiskBlock(pRuntimeEnv, 0); + break; // offset is not 0, and next time window locates in the next block. } } - } else { // forward the start position for projection query - skipBlocks(&pQInfo->runtimeEnv); - if (pQuery->limit.offset > 0) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return false; - } } - + return true; } @@ -5006,7 +4975,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { if (!isTSCompQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); } - + // skip blocks without load the actual data block from file if no filter condition present skipBlocks(&pQInfo->runtimeEnv); if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { @@ -5090,6 +5059,13 @@ static void tableIntervalProcess(SQInfo *pQInfo) { int32_t numOfInterpo = 0; + // skip blocks without load the actual data block from file if no filter condition present + skipTimeInterval(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { + setQueryStatus(pQuery, QUERY_COMPLETED); + return; + } + while (1) { tableIntervalProcessImpl(pRuntimeEnv);