diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 7aa5d496b0b79b16ddc2d856d527e728ddb4c89e..d66bfbffea59a4bb47f2e0f815090d7382deae94 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2237,7 +2237,7 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * // no qualified data in current block, do not update the lastKey value assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]); } else { - pQuery->lastKey = pPrimaryColumn[pQuery->pos + (forwardStep - 1)] + step; + pQuery->lastKey = pQuery->ekey + step;//pPrimaryColumn[pQuery->pos + (forwardStep - 1)] + step; } } else { @@ -2255,7 +2255,7 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * // no qualified data in current block, do not update the lastKey value assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]); } else { - pQuery->lastKey = pPrimaryColumn[pQuery->pos - (forwardStep - 1)] + step; + pQuery->lastKey = pQuery->ekey + step;//pPrimaryColumn[pQuery->pos - (forwardStep - 1)] + step; } } else { forwardStep = pQuery->pos + 1; @@ -7175,6 +7175,112 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete } } +static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo, + SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, + __block_search_fn_t searchFn) { + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + int64_t nextKey = -1; + bool completed = false; + + while (1) { + int32_t numOfRes = 0; + int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes); + assert(steps > 0); + + // NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range + if (pInfo->lastResRows == 0) { + pInfo->lastResRows = numOfRes; + } else { + assert(pInfo->lastResRows == 1); + } + + int32_t pos = pQuery->pos + steps * factor; + + // query does not reach the end of current block + if ((pos < pBlockInfo->size && QUERY_IS_ASC_QUERY(pQuery)) || (pos >= 0 && !QUERY_IS_ASC_QUERY(pQuery))) { + nextKey = pPrimaryCol[pos]; + } else { + assert((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || + (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))); + } + + // all data satisfy current query are checked, query completed + if (QUERY_IS_ASC_QUERY(pQuery)) { + completed = (pQuery->lastKey > pQuery->ekey); + } else { + completed = (pQuery->lastKey < pQuery->ekey); + } + + /* + * 1. there may be more date that satisfy current query interval, other than + * current block, we need to try next data blocks + * 2. query completed, since reaches the upper bound of the main query range + */ + if (!completed) { + /* + * Data that satisfy current query range may locate in current block and blocks that are directly right + * next to current block. Therefore, we need to keep the query range(interval) unchanged until reaching + * the direct next data block, while only forwards the pQuery->lastKey. + * + * With the information of the directly next data block, whether locates in cache or disk, + * current interval query being completed or not can be decided. + */ + saveIntervalQueryRange(pRuntimeEnv, pInfo); + + if (QUERY_IS_ASC_QUERY(pQuery)) { + assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey); + } else { + assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey); + } + + break; + } + + assert(completed); + + if (pQuery->ekey == pSupporter->rawEKey) { + /* whole query completed, save result and abort */ + saveResult(pSupporter, pInfo, pInfo->lastResRows); + + // save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache. + saveIntervalQueryRange(pRuntimeEnv, pInfo); + + return; + } else if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->ekey == pBlockInfo->keyLast) || + (QUERY_IS_ASC_QUERY(pQuery) && pQuery->ekey == pBlockInfo->keyFirst)) { +// /* current interval query is completed, set the next query range on other data blocks if exist */ +// int64_t prevEKey = pQuery->ekey; +// +// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); + saveIntervalQueryRange(pRuntimeEnv, pInfo); +// +// assert(prevEKey < pQuery->skey); +// if (pInfo->lastResRows > 0) { +// saveResult(pSupporter, pInfo, pInfo->lastResRows); +// } +// + return; + } + + saveResult(pSupporter, pInfo, pInfo->lastResRows); + + assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || + (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); + + /* still in the same block to query */ + getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); + saveIntervalQueryRange(pRuntimeEnv, pInfo); + + int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order); + assert(newPos == pQuery->pos + steps * factor); + + pQuery->pos = newPos; + } + +} int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) { assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);