diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 09030b14a8c1bb5c09d0c7566b830f151188f3b3..3dc520ad9de97b50ec42a420eb085a9415ab0e2e 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -84,7 +84,7 @@ static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuer const SQueryRuntimeEnv *pRuntimeEnv); static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); -static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); +static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); // check the offset value integrity @@ -1085,6 +1085,12 @@ bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj return false; } + /* + * The check for empty block: + * pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache + * procedure. The block has been allocated but data has not been put into yet. If the block is the last + * block(newly allocated block), abort query. Otherwise, skip it and go on. + */ if (pBlock->numOfPoints == 0) { dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d," "allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, @@ -1105,11 +1111,10 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE return NULL; } - assert(slot < pCacheInfo->maxBlocks); - + getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode); + SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; - if (pBlock == NULL) { - // the cache info snapshot must be existed. + if (pBlock == NULL) { // the cache info snapshot must be existed. int32_t curNumOfBlocks = pCacheInfo->numOfBlocks; int32_t curSlot = pCacheInfo->currentSlot; @@ -2555,6 +2560,8 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; pQuery->slot = *slot; + + // cache block has been flushed to disk, no required data block in cache. SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); if (pBlock == NULL) { return -1; @@ -2669,7 +2676,11 @@ static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualified } } -static void doGetAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) { +static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) { + if (pQuery->nAggTimeInterval == 0) { + return; + } + TSKEY skey1, ekey1; TSKEY skey2 = (skey < ekey) ? skey : ekey; @@ -2862,7 +2873,6 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter TSKEY lastKey = -1; - // todo copy data into temp buffer to avoid the buffer expired pQuery->fileId = -1; vnodeFreeFieldsEx(pRuntimeEnv); @@ -2878,9 +2888,18 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter pQuery->ekey = key; pQuery->lastKey = pQuery->skey; - // todo cache block may have been flushed to disk, and no data in cache anymore. - // So, copy cache block to local buffer is required. + /* + * cache block may have been flushed to disk, and no data in cache anymore. + * So, copy cache block to local buffer is required. + */ lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + if (lastKey < 0) { // data has been flushed to disk, try again search in file + lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn); + + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { + return false; + } + } } else { // no data in cache, try file TSKEY key = pMeterObj->lastKeyOnFile; @@ -2976,7 +2995,6 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup return false; } -// todo handle the mmap relative offset value assert problem int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *position) { TSKEY nextTimestamp = -1; @@ -4127,19 +4145,6 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { } } -// todo merge with doRevisedResultsByLimit -void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t *interpo) { - SQuery *pQuery = &(pQInfo->query); - - if (pQuery->limit.limit > 0 && ((*final) + pQInfo->pointsRead > pQuery->limit.limit)) { - int64_t num = (*final) + pQInfo->pointsRead - pQuery->limit.limit; - (*interpo) -= num; - (*final) -= num; - - setQueryStatus(pQuery, QUERY_COMPLETED); // query completed - } -} - TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) { if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) { return -1; @@ -4189,7 +4194,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { } // todo remove this function -static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) { +static TSKEY getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; assert(pQuery->fileId == -1 && QUERY_IS_ASC_QUERY(pQuery)); @@ -4208,10 +4213,11 @@ static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) { } else if (nextTimestamp > pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } + + return nextTimestamp; } -// TODO handle case that the cache is allocated but not assign to SMeterObj -void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) { +TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) { SQuery * pQuery = pRuntimeEnv->pQuery; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; @@ -4239,10 +4245,13 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear if (key < pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } + + return key; } else { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); + return -1; // no data to check } - } else { + } else {//asc query bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn); if (ret) { dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo, @@ -4254,15 +4263,18 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear if (key > pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } + + return key; } else { /* - * all data in file is less than the pQuery->lastKey, try cache. + * all data in file is less than the pQuery->lastKey, try cache again. * cache block status will be set in getFirstDataBlockInCache function */ - getFirstDataBlockInCache(pRuntimeEnv); + TSKEY key = getFirstDataBlockInCache(pRuntimeEnv); dTrace("QInfo:%p vid:%d sid:%d id:%s find the new position in cache, fileId:%d, slot:%d, pos:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot, pQuery->pos); + return key; } } } @@ -4425,6 +4437,8 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl pSummary->fileTimeUs += (taosGetTimestampUs() - start); } else { + assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true)); + SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); @@ -5439,7 +5453,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - // usually this load operation will incure load disk block operation + // usually this load operation will incur load disk block operation TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos); assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) || @@ -7052,15 +7066,6 @@ void copyFromGroupBuf(SQInfo *pQInfo, SOutputRes *result) { assert(pQuery->pointsRead <= pQuery->pointsToRead); } -// todo refactor according to its called env!! -static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey) { - if (pQuery->nAggTimeInterval == 0) { - return; - } - - doGetAlignedIntervalQueryRange(pQuery, keyInData, skey, ekey); -} - static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, __block_search_fn_t searchFn) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index e9635c4ee6b726f94ae29384accf2df0faa70555..76a358cfc2eb2efad2eaf5646b9989dac2466bd7 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -198,12 +198,9 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe /* * 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next - * - * 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache - * procedure. The block has been allocated but data has not been put into yet. If the block is the last - * block(newly allocated block), abort query. Otherwise, skip it and go on. + * The check for empty block is refactor to getCacheDataBlock function */ - if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) { + if (pBlock == NULL) { if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) { break; } @@ -619,9 +616,6 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pSupporter->rawEKey = key; int64_t num = doCheckMetersInGroup(pQInfo, index, start); - if (num == 0) { - int32_t k = 1; - } assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode,