提交 9e5ddfe4 编写于 作者: H hjxilinx

add the support of issue #1131. [tbase-901]

上级 ca136d47
......@@ -84,7 +84,7 @@ static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuer
const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
// check the offset value integrity
......@@ -1085,6 +1085,12 @@ bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj
return false;
}
/*
* The check for empty block:
* pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache
* procedure. The block has been allocated but data has not been put into yet. If the block is the last
* block(newly allocated block), abort query. Otherwise, skip it and go on.
*/
if (pBlock->numOfPoints == 0) {
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d,"
"allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid,
......@@ -1105,11 +1111,10 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE
return NULL;
}
assert(slot < pCacheInfo->maxBlocks);
getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode);
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot];
if (pBlock == NULL) {
// the cache info snapshot must be existed.
if (pBlock == NULL) { // the cache info snapshot must be existed.
int32_t curNumOfBlocks = pCacheInfo->numOfBlocks;
int32_t curSlot = pCacheInfo->currentSlot;
......@@ -2555,6 +2560,8 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
pQuery->slot = *slot;
// cache block has been flushed to disk, no required data block in cache.
SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
if (pBlock == NULL) {
return -1;
......@@ -2669,7 +2676,11 @@ static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualified
}
}
static void doGetAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) {
static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) {
if (pQuery->nAggTimeInterval == 0) {
return;
}
TSKEY skey1, ekey1;
TSKEY skey2 = (skey < ekey) ? skey : ekey;
......@@ -2862,7 +2873,6 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter
TSKEY lastKey = -1;
// todo copy data into temp buffer to avoid the buffer expired
pQuery->fileId = -1;
vnodeFreeFieldsEx(pRuntimeEnv);
......@@ -2878,9 +2888,18 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter
pQuery->ekey = key;
pQuery->lastKey = pQuery->skey;
// todo cache block may have been flushed to disk, and no data in cache anymore.
// So, copy cache block to local buffer is required.
/*
* cache block may have been flushed to disk, and no data in cache anymore.
* So, copy cache block to local buffer is required.
*/
lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (lastKey < 0) { // data has been flushed to disk, try again search in file
lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
return false;
}
}
} else { // no data in cache, try file
TSKEY key = pMeterObj->lastKeyOnFile;
......@@ -2976,7 +2995,6 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup
return false;
}
// todo handle the mmap relative offset value assert problem
int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *position) {
TSKEY nextTimestamp = -1;
......@@ -4127,19 +4145,6 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
}
}
// todo merge with doRevisedResultsByLimit
void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t *interpo) {
SQuery *pQuery = &(pQInfo->query);
if (pQuery->limit.limit > 0 && ((*final) + pQInfo->pointsRead > pQuery->limit.limit)) {
int64_t num = (*final) + pQInfo->pointsRead - pQuery->limit.limit;
(*interpo) -= num;
(*final) -= num;
setQueryStatus(pQuery, QUERY_COMPLETED); // query completed
}
}
TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) {
if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) {
return -1;
......@@ -4189,7 +4194,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) {
}
// todo remove this function
static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) {
static TSKEY getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(pQuery->fileId == -1 && QUERY_IS_ASC_QUERY(pQuery));
......@@ -4208,10 +4213,11 @@ static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) {
} else if (nextTimestamp > pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return nextTimestamp;
}
// TODO handle case that the cache is allocated but not assign to SMeterObj
void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) {
TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
......@@ -4239,10 +4245,13 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear
if (key < pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return key;
} else {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return -1; // no data to check
}
} else {
} else {//asc query
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn);
if (ret) {
dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo,
......@@ -4254,15 +4263,18 @@ void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sear
if (key > pQuery->ekey) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
return key;
} else {
/*
* all data in file is less than the pQuery->lastKey, try cache.
* all data in file is less than the pQuery->lastKey, try cache again.
* cache block status will be set in getFirstDataBlockInCache function
*/
getFirstDataBlockInCache(pRuntimeEnv);
TSKEY key = getFirstDataBlockInCache(pRuntimeEnv);
dTrace("QInfo:%p vid:%d sid:%d id:%s find the new position in cache, fileId:%d, slot:%d, pos:%d", pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot, pQuery->pos);
return key;
}
}
}
......@@ -4425,6 +4437,8 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else {
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true));
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
......@@ -5439,7 +5453,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// usually this load operation will incure load disk block operation
// usually this load operation will incur load disk block operation
TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) ||
......@@ -7052,15 +7066,6 @@ void copyFromGroupBuf(SQInfo *pQInfo, SOutputRes *result) {
assert(pQuery->pointsRead <= pQuery->pointsToRead);
}
// todo refactor according to its called env!!
static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey) {
if (pQuery->nAggTimeInterval == 0) {
return;
}
doGetAlignedIntervalQueryRange(pQuery, keyInData, skey, ekey);
}
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData,
SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields,
__block_search_fn_t searchFn) {
......
......@@ -198,12 +198,9 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
/*
* 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next
*
* 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache
* procedure. The block has been allocated but data has not been put into yet. If the block is the last
* block(newly allocated block), abort query. Otherwise, skip it and go on.
* The check for empty block is refactor to getCacheDataBlock function
*/
if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) {
if (pBlock == NULL) {
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
break;
}
......@@ -619,9 +616,6 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pSupporter->rawEKey = key;
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
if (num == 0) {
int32_t k = 1;
}
assert(num >= 0);
} else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册