diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index dc0d3febcee17408bbd7a327331d4e9d67d3416b..f64bc86bf7904fc6d119ec0bcc9b85eae0ce2fb2 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1157,24 +1157,22 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE offset = pQuery->commitPoint; numOfPoints = pNewBlock->numOfPoints - offset; - - if (offset != 0) { - dTrace( - "%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. " - "first:%d last:%d", - GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pQuery->firstSlot, - pQuery->currentSlot); - } - pNewBlock->numOfPoints = numOfPoints; - // current block are all commit already, ignore it - if (pNewBlock->numOfPoints == 0) { - dTrace( - "%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d" - "first:%d last:%d", - GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot); - return NULL; + if (offset != 0) { + if (pNewBlock->numOfPoints > 0) { + dTrace("QInfo:%p ignore the data in cache block that are commit already, numOfBlock:%d slot:%d ignore points:%d " + "remain:%d first:%d last:%d", + GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pNewBlock->numOfPoints, + pQuery->firstSlot, pQuery->currentSlot); + } else { + // current block are all commit already, ignore it + assert (pNewBlock->numOfPoints == 0); + dTrace("QInfo:%p ignore points in cache block that are all commit already, numOfBlock:%d slot:%d ignore points:%d " + "first:%d last:%d", + GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, slot, offset, pQuery->firstSlot, pQuery->currentSlot); + return NULL; + } } } @@ -3113,19 +3111,19 @@ static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterO // earliest key in cache TSKEY keyFirst = 0; TSKEY keyLast = pMeterObj->lastKey; + + // keep the value in local variable, since it may be changed by other thread any time + int32_t numOfBlocks = pCacheInfo->numOfBlocks; + int32_t currentSlot = pCacheInfo->currentSlot; + + // no data in cache, return false directly + if (numOfBlocks == 0) { + return false; + } + + int32_t first = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo); while (1) { - // keep the value in local variable, since it may be changed by other thread any time - int32_t numOfBlocks = pCacheInfo->numOfBlocks; - int32_t currentSlot = pCacheInfo->currentSlot; - - // no data in cache, return false directly - if (numOfBlocks == 0) { - return false; - } - - int32_t first = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo); - /* * pBlock may be null value since this block is flushed to disk, and re-distributes to * other meter, so go on until we get the first not flushed cache block. @@ -3137,9 +3135,12 @@ static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterO /* * there may be only one empty cache block existed caused by import. */ - if (numOfBlocks == 1) { + if (first == currentSlot || numOfBlocks == 1) { return false; } + + // todo use defined macro + first = (first + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; } }