From 0a0852ad466353151dc81bacb5b8af4214f6731d Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 30 Oct 2019 10:30:57 +0800 Subject: [PATCH] fix bug #660 --- src/os/linux/inc/os.h | 5 +++ src/system/src/vnodeQueryImpl.c | 10 ++--- src/system/src/vnodeQueryProcess.c | 64 ++++++++++++++++++++---------- src/util/src/tcache.c | 2 +- 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 47533a8544..35ef811a8f 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -60,8 +60,13 @@ #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap + #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch + +#define __sync_sub_and_fetch_64 __sync_sub_and_fetch +#define __sync_sub_and_fetch_32 __sync_sub_and_fetch + int32_t __sync_val_load_32(int32_t *ptr); void __sync_val_restore_32(int32_t *ptr, int32_t newval); diff --git a/src/system/src/vnodeQueryImpl.c b/src/system/src/vnodeQueryImpl.c index a8d286b90c..023939c55e 100644 --- a/src/system/src/vnodeQueryImpl.c +++ b/src/system/src/vnodeQueryImpl.c @@ -810,14 +810,14 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; if (pBlock == NULL) { dError("QInfo:%p NULL Block In Cache, available block:%d, last block:%d, accessed null block:%d, pBlockId:%d", - GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId); + GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId); return NULL; } - if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId || pBlock->numOfPoints <= 0) { - dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, - pQuery->blockId); + if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { + dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, blockMeterObj:%p", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, + pQuery->blockId, pMeterObj, pBlock->pMeterObj); return NULL; } diff --git a/src/system/src/vnodeQueryProcess.c b/src/system/src/vnodeQueryProcess.c index b50606b933..f00f481f6d 100644 --- a/src/system/src/vnodeQueryProcess.c +++ b/src/system/src/vnodeQueryProcess.c @@ -27,6 +27,12 @@ #include "vnodeQueryImpl.h" +#define ALL_CACHE_BLOCKS_CHECKED(q) \ + ((q)->slot == (q)->currentSlot && QUERY_IS_ASC_QUERY(q) || (q)->slot == (q)->firstSlot && (!QUERY_IS_ASC_QUERY(q))) + +#define FORWARD_CACHE_BLOCK_CHECK_SLOT(slot, step, maxblocks) (slot) = ((slot) + (step) + (maxblocks)) % (maxblocks); + + static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SQuery * pQuery = &pQInfo->query; @@ -49,6 +55,22 @@ static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataI return true; } +/** + * The start position of the first check cache block is located before starting the loop. + * And the start position for next cache blocks needs to be decided before checking each cache block. + */ +static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, bool *firstCheckSlot) { + if (!(*firstCheckSlot)) { + if (QUERY_IS_ASC_QUERY(pQuery)) { + pQuery->pos = 0; + } else { + pQuery->pos = pBlock->numOfPoints - 1; + } + } else { + (*firstCheckSlot) = false; + } +} + static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; @@ -147,24 +169,39 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe continue; } + bool firstCheckSlot = true; SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; + for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) { pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - // cache block may be flushed to disk, so it is not available, ignore it and try next - if (pBlock == NULL) { - pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; + /* + * 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next + * + * 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache + * procedure. The block has been allocated but data has not been put into yet. If the block is the last + * block(newly allocated block), abort query. Otherwise, skip it and go on. + */ + if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) { + if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { + break; + } + + FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks); continue; } + setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot); + TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0]; - // in handling file data block, this query condition is checked during fetching candidate file blocks + + // in handling file data block, the timestamp range validation is done during fetching candidate file blocks if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || (primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { break; } - /* only record the key on last block */ + // only record the key on last block SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus); SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK); @@ -176,24 +213,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL, searchFn); - // todo refactor - if ((pQuery->slot == pQuery->currentSlot && QUERY_IS_ASC_QUERY(pQuery)) || - (pQuery->slot == pQuery->firstSlot && !QUERY_IS_ASC_QUERY(pQuery))) { + if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; } - // try next cache block - pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; - if (QUERY_IS_ASC_QUERY(pQuery)) { - pQuery->pos = 0; - } else { // backwards traverse encounter the cache invalid, abort scan cache. - SCacheBlock *pNextBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); - if (pNextBlock == NULL) { - break; // todo fix - } else { - pQuery->pos = pNextBlock->numOfPoints - 1; - } - } + FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks); } } } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ceb8bc059d..52d8638c09 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -614,7 +614,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) { } if (pNode->refCount > 0) { - __sync_add_and_fetch_32(&pNode->refCount, -1); + __sync_sub_and_fetch_32(&pNode->refCount, 1); pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount); } else { /* -- GitLab