From aceeed3b0104b5835fba346c85583217d8a50ca8 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 25 Nov 2019 16:21:05 +0800 Subject: [PATCH] [tbase-1007] --- src/system/detail/inc/vnodeQueryImpl.h | 3 +- src/system/detail/inc/vnodeRead.h | 56 ++-- src/system/detail/src/vnodeQueryImpl.c | 296 ++++++++++++---------- src/system/detail/src/vnodeQueryProcess.c | 16 +- 4 files changed, 199 insertions(+), 172 deletions(-) diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 810105d638..57c4149f22 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -174,7 +174,7 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32 SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, __block_search_fn_t searchFn); -SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, SQueryFileInfo* pQueryFileInfo, +SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, int32_t fileIndex, tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo, int32_t* numOfMeters); int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order); @@ -194,6 +194,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuer int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo); int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); /** * Create SMeterQueryInfo. diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 1de9f97e82..b9bf684f79 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -57,9 +57,9 @@ typedef struct SQueryFileInfo { char lastFilePath[256]; int32_t defaultMappingSize; /* default mapping size */ - int32_t headerFd; /* file handler */ - char* pHeaderFileData; /* mmap header files */ - size_t headFileSize; + int32_t headerFd; /* file handler */ + char* pHeaderFileData; /* mmap header files */ + size_t headFileSize; int32_t dataFd; char* pDataFileData; size_t dataFileSize; @@ -107,44 +107,40 @@ typedef struct SOutputRes { } SOutputRes; typedef struct RuntimeEnvironment { - SPositionInfo startPos; /* the start position, used for secondary/third iteration */ - SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ - SPositionInfo nextPos; /* start position of the next scan */ - SData* colDataBuffer[TSDB_MAX_COLUMNS]; - SResultInfo* resultInfo; - - // Indicate if data block is loaded, the block is first/last/internal block - int8_t blockStatus; - int32_t unzipBufSize; - SData* primaryColBuffer; - char* unzipBuffer; - char* secondaryUnzipBuffer; - SQuery* pQuery; - SMeterObj* pMeterObj; - SQLFunctionCtx* pCtx; - SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ + SPositionInfo startPos; /* the start position, used for secondary/third iteration */ + SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ + SPositionInfo nextPos; /* start position of the next scan */ + SData* colDataBuffer[TSDB_MAX_COLUMNS]; + SResultInfo* resultInfo; + uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block + int32_t unzipBufSize; + SData* primaryColBuffer; + char* unzipBuffer; + char* secondaryUnzipBuffer; + SQuery* pQuery; + SMeterObj* pMeterObj; + SQLFunctionCtx* pCtx; + SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ /* * header files info, avoid to iterate the directory, the data is acquired * during in query preparation function */ - SQueryFileInfo* pHeaderFiles; - uint32_t numOfFiles; /* number of files of one vnode during query execution */ - - int16_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; - - int16_t scanFlag; /* denotes reversed scan of data or not */ + SQueryFileInfo* pVnodeFiles; + uint32_t numOfFiles; // the total available number of files for this virtual node during query execution + int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap. + int16_t numOfRowsPerPage; + int16_t offset[TSDB_MAX_COLUMNS]; + int16_t scanFlag; // denotes reversed scan of data or not SInterpolationInfo interpoInfo; SData** pInterpoBuf; SOutputRes* pResult; // reference to SQuerySupporter->pResult void* hashList; int32_t usedIndex; // assigned SOutputRes in list - - STSBuf* pTSBuf; - STSCursor cur; - SQueryCostSummary summary; + STSBuf* pTSBuf; + STSCursor cur; + SQueryCostSummary summary; } SQueryRuntimeEnv; /* intermediate result during multimeter query involves interval */ diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index f7f456ec53..5a6a558436 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -47,7 +47,7 @@ static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQue __read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile}; -static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo); +static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo); static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn, bool loadData); static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, @@ -57,7 +57,7 @@ static int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, static TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index); static TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index); -static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos); +static void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, int32_t pos); static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t step); static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult); @@ -195,7 +195,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj // if vnodeFreeFields is called, the pQuery->pFields is NULL if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && pQuery->pFields != NULL && pQuery->fileId > 0) { - assert(pRuntimeEnv->pHeaderFiles[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); + assert(pRuntimeEnv->pVnodeFiles[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); return true; } @@ -207,7 +207,7 @@ static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t f pLoadCompBlockInfo->sid = sid; pLoadCompBlockInfo->fileListIndex = fileIndex; - pLoadCompBlockInfo->fileId = pRuntimeEnv->pHeaderFiles[fileIndex].fileID; + pLoadCompBlockInfo->fileId = pRuntimeEnv->pVnodeFiles[fileIndex].fileID; } static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo) { @@ -247,6 +247,60 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } +static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) { + if (pRuntimeEnv->mmapedHFileIndex >= 0) { + assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0); + + SQueryFileInfo *otherVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->mmapedHFileIndex]; + munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize); + + otherVnodeFiles->pHeaderFileData = NULL; + pRuntimeEnv->mmapedHFileIndex = -1; + } else { + assert(pRuntimeEnv->mmapedHFileIndex == -1); + } +} + +/** + * mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to + * avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory", + * during query processing. + * + * @param pRuntimeEnv + * @param fileIndex + * @return the return value may be null, so any invoker needs to check the returned value + */ +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) { + assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles); + + SQuery *pQuery = pRuntimeEnv->pQuery; + SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output + + SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex]; + size_t size = pVnodeFiles->headFileSize; + + if (pVnodeFiles->pHeaderFileData == NULL) { + assert(pRuntimeEnv->mmapedHFileIndex != fileIndex); + doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file + + pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0); + if (pVnodeFiles->pHeaderFileData == MAP_FAILED) { + pVnodeFiles->pHeaderFileData = NULL; + dError("QInfo:%p failed to map header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size, + strerror(errno)); + } else { + pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file + if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) { + dError("QInfo:%p failed to advise kernel the usage of header files, reason:%s", pQInfo, strerror(errno)); + } + } + } else { + assert(pRuntimeEnv->mmapedHFileIndex == fileIndex); + } + + return pVnodeFiles->pHeaderFileData; +} + /* * read comp block info from header file * @@ -256,8 +310,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIndex]; - int32_t fd = pQueryFileInfo->headerFd; + SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex]; int64_t st = taosGetTimestampUs(); @@ -273,8 +326,11 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim pSummary->numOfSeek++; #if 1 - char *data = pRuntimeEnv->pHeaderFiles[fileIndex].pHeaderFileData; - UNUSED(fd); + char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); // failed to load the header file data into memory + if (data == NULL) { + return -1; + } + #else char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN); read(fd, data, tmsize + TSDB_FILE_HEADER_LEN); @@ -353,7 +409,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim int64_t et = taosGetTimestampUs(); qTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load compblock info, size:%d, elapsed:%f ms", pQInfo, - pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->pHeaderFiles[fileIndex].fileID, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->pVnodeFiles[fileIndex].fileID, compBlockSize, (et - st) / 1000.0); pSummary->totalCompInfoSize += compBlockSize; @@ -379,7 +435,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, int32_t blockStatus, void *param, int32_t scanFlag); -void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQuery); +void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQuery); static void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols); static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { @@ -439,10 +495,10 @@ static int32_t moveMMapWindow(SQueryFileInfo *pQueryFileInfo, uint64_t offset) { } /* - * 1. there is import data that locate farther from the beginning, but with less timestamp, so we need to move the - * window backwards - * 2. otherwise, move the mmaping window forward - */ + * 1. there is import data that locate farther from the beginning, but with less timestamp, so we need to move the + * window backwards + * 2. otherwise, move the mmaping window forward + */ upperBnd = (offset / pQueryFileInfo->defaultMappingSize + 1) * pQueryFileInfo->defaultMappingSize - 1; /* unmap previous buffer */ @@ -636,7 +692,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; SData ** sdata = pRuntimeEnv->colDataBuffer; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx]; + SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer; void * tmpBuf = pRuntimeEnv->unzipBuffer; @@ -850,9 +906,11 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo } if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { - dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, blockMeterObj:%p", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, - pQuery->blockId, pMeterObj, pBlock->pMeterObj); + dWarn( + "QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, " + "blockMeterObj:%p", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, + pQuery->blockId, pMeterObj, pBlock->pMeterObj); return NULL; } @@ -938,7 +996,7 @@ static bool getQualifiedDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRunti break; } - dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d into memory failed due to error in disk files", + dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx); blkIdx += step; } @@ -1110,9 +1168,9 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; -// if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { -// continue; -// } + // if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + // continue; + // } SField dummyField = {0}; @@ -1135,8 +1193,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; - int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, - pQuery->precision); + int64_t alignedTimestamp = + taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -1405,8 +1463,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, *forwardStep, isDiskFileBlock); TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; - int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, - pQuery->precision); + int64_t alignedTimestamp = + taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); @@ -1655,18 +1713,18 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, } /* set the initial file for current query */ - if (order == TSQL_SO_ASC && *fid < pRuntimeEnv->pHeaderFiles[0].fileID) { - *fid = pRuntimeEnv->pHeaderFiles[0].fileID; + if (order == TSQL_SO_ASC && *fid < pRuntimeEnv->pVnodeFiles[0].fileID) { + *fid = pRuntimeEnv->pVnodeFiles[0].fileID; return 0; - } else if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1].fileID) { - *fid = pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1].fileID; + } else if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID) { + *fid = pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID; return pRuntimeEnv->numOfFiles - 1; } int32_t numOfFiles = pRuntimeEnv->numOfFiles; - if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pHeaderFiles[numOfFiles - 1].fileID) { - *fid = pRuntimeEnv->pHeaderFiles[numOfFiles - 1].fileID; + if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID) { + *fid = pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID; return numOfFiles - 1; } @@ -1674,12 +1732,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, int32_t i = 0; int32_t step = 1; - while (i pRuntimeEnv->pHeaderFiles[i].fileID) { + while (i pRuntimeEnv->pVnodeFiles[i].fileID) { i += step; } - if (i < numOfFiles && *fid <= pRuntimeEnv->pHeaderFiles[i].fileID) { - *fid = pRuntimeEnv->pHeaderFiles[i].fileID; + if (i < numOfFiles && *fid <= pRuntimeEnv->pVnodeFiles[i].fileID) { + *fid = pRuntimeEnv->pVnodeFiles[i].fileID; return i; } else { return -1; @@ -1688,12 +1746,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, int32_t i = numOfFiles - 1; int32_t step = -1; - while (i >= 0 && *fid < pRuntimeEnv->pHeaderFiles[i].fileID) { + while (i >= 0 && *fid < pRuntimeEnv->pVnodeFiles[i].fileID) { i += step; } - if (i >= 0 && *fid >= pRuntimeEnv->pHeaderFiles[i].fileID) { - *fid = pRuntimeEnv->pHeaderFiles[i].fileID; + if (i >= 0 && *fid >= pRuntimeEnv->pVnodeFiles[i].fileID) { + *fid = pRuntimeEnv->pVnodeFiles[i].fileID; return i; } else { return -1; @@ -1723,6 +1781,7 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter break; } + // failed to mmap header file into memory will cause the retrieval of compblock info failed if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) { break; } @@ -1821,13 +1880,13 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes } // set the output buffer for the selectivity + tag query -static void setCtxTagColumnInfo(SQuery* pQuery, SQueryRuntimeEnv* pRuntimeEnv) { +static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) { if (isSelectivityWithTagsQuery(pQuery)) { int32_t num = 0; SQLFunctionCtx *pCtx = NULL; int16_t tagLen = 0; - SQLFunctionCtx ** pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES); + SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES); for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { @@ -1973,7 +2032,7 @@ _error_clean: tfree(pRuntimeEnv->resultInfo); tfree(pRuntimeEnv->pCtx); - for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { tfree(pRuntimeEnv->colDataBuffer[i]); } @@ -1993,7 +2052,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pRuntimeEnv->pQuery)); - for(int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { tfree(pRuntimeEnv->colDataBuffer[i]); } @@ -2024,7 +2083,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { - SQueryFileInfo *pQFileInfo = &(pRuntimeEnv->pHeaderFiles[i]); + SQueryFileInfo *pQFileInfo = &(pRuntimeEnv->pVnodeFiles[i]); if (pQFileInfo->pHeaderFileData != NULL && pQFileInfo->pHeaderFileData != MAP_FAILED) { munmap(pQFileInfo->pHeaderFileData, pQFileInfo->headFileSize); } @@ -2038,9 +2097,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tclose(pQFileInfo->lastFd); } - if (pRuntimeEnv->pHeaderFiles != NULL) { + if (pRuntimeEnv->pVnodeFiles != NULL) { pRuntimeEnv->numOfFiles = 0; - free(pRuntimeEnv->pHeaderFiles); + free(pRuntimeEnv->pVnodeFiles); } if (pRuntimeEnv->pInterpoBuf != NULL) { @@ -2791,11 +2850,17 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p } /* - * NOTE: the compblock information may not be loaded yet, here loaded it firstly + * NOTE: + * The compblock information may not be loaded yet, here loaded it firstly. * If the compBlock info is loaded, it wont be loaded again. + * + * If failed to load comp block into memory due some how reasons, e.g., empty header file/not enough memory */ int32_t numOfBlocks = vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIdx); - assert(numOfBlocks > 0); + if (numOfBlocks <= 0) { + position->fileId = -1; + return -1; + } nextTimestamp = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); } @@ -2865,71 +2930,28 @@ static int file_order_comparator(const void *p1, const void *p2) { */ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, char *fileName, char *prefix) { - __off_t size = 0; + // __off_t size = 0; pVnodeFiles->fileID = fid; pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE; snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName); - -#if 1 pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); -#else - int32_t *val = (int32_t *)taosGetStrHashData(fileHandleHashList, pVnodeFiles->headerFilePath); - if (val == NULL) { - pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); - taosAddStrHash(fileHandleHashList, pVnodeFiles->headerFilePath, (char *)&pVnodeFiles->headerFd); - } else { - pVnodeFiles->headerFd = *val; - } -#endif if (!VALIDFD(pVnodeFiles->headerFd)) { dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); goto _clean; } - struct stat fstat; + struct stat fstat = {0}; if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1; pVnodeFiles->headFileSize = fstat.st_size; - size = fstat.st_size; - - pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0); - if (pVnodeFiles->pHeaderFileData == MAP_FAILED) { - dError("QInfo:%p failed to map header file:%s, %s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); - goto _clean; - } - - /* even the advise failed, continue.. */ - if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) { - dError("QInfo:%p failed to advise kernel the usage of header files, reason:%s", pQInfo, strerror(errno)); - } snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid); snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid); -#if 1 pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); -#else - val = (int32_t *)taosGetStrHashData(fileHandleHashList, pVnodeFiles->dataFilePath); - if (val == NULL) { - pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); - taosAddStrHash(fileHandleHashList, pVnodeFiles->dataFilePath, (char *)&pVnodeFiles->dataFd); - } else { - pVnodeFiles->dataFd = *val; - } -#endif - - if (!VALIDFD(pVnodeFiles->dataFd)) { - dError("QInfo:%p failed to open data file:%s, reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno)); - goto _clean; - } - - if (!VALIDFD(pVnodeFiles->lastFd)) { - dError("QInfo:%p failed to open last file:%s, reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno)); - goto _clean; - } if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; pVnodeFiles->dataFileSize = fstat.st_size; @@ -2953,13 +2975,9 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles } #endif - return 0; + return TSDB_CODE_SUCCESS; _clean: - if (pVnodeFiles->pHeaderFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) { - munmap(pVnodeFiles->pHeaderFileData, pVnodeFiles->headFileSize); - pVnodeFiles->pHeaderFileData = NULL; - } #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) { @@ -2987,10 +3005,10 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { char suffix[] = ".head"; struct dirent *pEntry = NULL; - int32_t alloc = 4; // default allocated size + size_t alloc = 4; // default allocated size SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->pMeterQuerySupporter->runtimeEnv); - pRuntimeEnv->pHeaderFiles = calloc(1, sizeof(SQueryFileInfo) * alloc); + pRuntimeEnv->pVnodeFiles = calloc(1, sizeof(SQueryFileInfo) * alloc); SVnodeObj *pVnode = &vnodeList[vnodeId]; while ((pEntry = readdir(pDir)) != NULL) { @@ -3026,11 +3044,11 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { if (++pRuntimeEnv->numOfFiles > alloc) { alloc = alloc << 1; - pRuntimeEnv->pHeaderFiles = realloc(pRuntimeEnv->pHeaderFiles, alloc * sizeof(SQueryFileInfo)); - memset(&pRuntimeEnv->pHeaderFiles[alloc >> 1], 0, (alloc >> 1) * sizeof(SQueryFileInfo)); + pRuntimeEnv->pVnodeFiles = realloc(pRuntimeEnv->pVnodeFiles, alloc * sizeof(SQueryFileInfo)); + memset(&pRuntimeEnv->pVnodeFiles[alloc >> 1], 0, (alloc >> 1) * sizeof(SQueryFileInfo)); } - SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pHeaderFiles[pRuntimeEnv->numOfFiles - 1]; + SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1]; int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix); if (ret < 0) { memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information @@ -3043,7 +3061,7 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pRuntimeEnv->numOfFiles, dbFilePathPrefix); /* order the files information according their names */ - qsort(pRuntimeEnv->pHeaderFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator); + qsort(pRuntimeEnv->pVnodeFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) { @@ -3122,8 +3140,8 @@ static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; - if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || - functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAG_DUMMY) { + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG || + functionId == TSDB_FUNC_TAG_DUMMY) { continue; } @@ -3141,7 +3159,8 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { // in case of point-interpolation query, use asc order scan - char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " + char msg[] = + "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%lld-%lld, " "new qrange:%lld-%lld"; // descending order query @@ -3440,12 +3459,12 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI // only the function of interp needs the corresponding information if (pCtx->functionId != TSDB_FUNC_INTERP) { - continue; + continue; } pCtx->numOfParams = 4; - SInterpInfo * pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf; + SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf; pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail)); @@ -3624,6 +3643,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete // dataInCache requires lastKey value pQuery->lastKey = pQuery->skey; + pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3793,6 +3813,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) changeExecuteScanOrder(pQuery, true); + pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3917,13 +3938,13 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { } // todo merge with doRevisedResultsByLimit -void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t * final, int32_t *interpo) { +void UNUSED_FUNC truncateResultByLimit(SQInfo *pQInfo, int64_t *final, int32_t *interpo) { SQuery *pQuery = &(pQInfo->query); - if (pQuery->limit.limit > 0 && ((* final) + pQInfo->pointsRead > pQuery->limit.limit)) { - int64_t num = (* final) + pQInfo->pointsRead - pQuery->limit.limit; + if (pQuery->limit.limit > 0 && ((*final) + pQInfo->pointsRead > pQuery->limit.limit)) { + int64_t num = (*final) + pQInfo->pointsRead - pQuery->limit.limit; (*interpo) -= num; - (* final) -= num; + (*final) -= num; setQueryStatus(pQuery, QUERY_COMPLETED); // query completed } @@ -3974,7 +3995,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { // the fields info is not loaded, load it into memory if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { - loadDataBlockFieldsInfo(pRuntimeEnv, &pRuntimeEnv->pHeaderFiles[fileIndex], pBlock, &pQuery->pFields[pQuery->slot]); + loadDataBlockFieldsInfo(pRuntimeEnv, &pRuntimeEnv->pVnodeFiles[fileIndex], pBlock, &pQuery->pFields[pQuery->slot]); } SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); @@ -4006,7 +4027,7 @@ static void getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) { } } -//TODO handle case that the cache is allocated but not assign to SMeterObj +// TODO handle case that the cache is allocated but not assign to SMeterObj void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn) { SQuery * pQuery = pRuntimeEnv->pQuery; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); @@ -4464,17 +4485,18 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; - pCtx[i].aInputElemBuf = ((char *) inputSrc->data) + - ((int32_t) pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + pCtx[i].outputBytes * inputIdx; + pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + + ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + + pCtx[i].outputBytes * inputIdx; - //in case of tag column, the tag information should be extracted from input buffer + // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { tVariantDestroy(&pCtx[i].tag); tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType); } } - for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TAG_DUMMY) { continue; @@ -4942,7 +4964,8 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { } } } - } else { + } else { // TODO ERROR!! + // need to handle for each query result, not just the single runtime ctx. for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; @@ -5460,15 +5483,18 @@ static int32_t offsetComparator(const void *pLeft, const void *pRight) { * @param pMeterHeadDataInfo * @return */ -SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, SQueryFileInfo *pQueryFileInfo, - tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo, int32_t *numOfMeters) { - SQuery * pQuery = &pQInfo->query; +SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t fileIndex, tSidSet *pSidSet, + SMeterDataInfo *pMeterDataInfo, int32_t *numOfMeters) { + SQuery *pQuery = &pQInfo->query; + SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; + SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; + SQueryFileInfo * pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex]; SVnodeObj *pVnode = &vnodeList[vid]; - char * pHeaderData = pQueryFileInfo->pHeaderFileData; + char * pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); // file is corrupted, abort query in current file @@ -6467,7 +6493,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkS int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx]; + SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; @@ -6869,7 +6895,7 @@ bool vnodeHasRemainResults(void *handle) { // query has completed if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { - TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, + TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data, remain, pQuery->nAggTimeInterval, ekey, pQuery->pointsToRead); @@ -6914,18 +6940,18 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p return numOfRes; } -static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char* data, int32_t* size) { - SMeterObj* pObj = pQInfo->pObj; - SQuery* pQuery = &pQInfo->query; +static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int32_t *size) { + SMeterObj *pObj = pQInfo->pObj; + SQuery * pQuery = &pQInfo->query; - int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; + int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; int32_t dataSize = pQInfo->query.rowSize * numOfRows; if (dataSize >= tsCompressMsgSize && tsCompressMsgSize > 0) { - char* compBuf = malloc((size_t) dataSize); + char *compBuf = malloc((size_t)dataSize); // for metric query, bufIndex always be 0. - char* d = compBuf; + char *d = compBuf; for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 int32_t bytes = pQuery->pSelectExpr[col].resBytes; @@ -6937,7 +6963,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char* data dTrace("QInfo:%p compress rsp msg, before:%d, after:%d", pQInfo, dataSize, *size); free(compBuf); - } else { // for metric query, bufIndex always be 0. + } else { // for metric query, bufIndex always be 0. for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 int32_t bytes = pQuery->pSelectExpr[col].resBytes; @@ -6958,9 +6984,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char* data * @param numOfRows the number of rows that are not returned in current retrieve * @return */ -int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t* size) { +int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size) { SQInfo *pQInfo = (SQInfo *)handle; - SQuery * pQuery = &pQInfo->query; + SQuery *pQuery = &pQInfo->query; assert(pQuery->pSelectExpr != NULL && pQuery->numOfOutputCols > 0); @@ -6998,7 +7024,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage while (1) { numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, + TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, numOfRows, diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 2fae48b143..4f95f8096b 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -289,12 +289,11 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe pQuery->fileId = fid; pSummary->numOfFiles++; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx]; - char * pHeaderData = pQueryFileInfo->pHeaderFileData; + SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; int32_t numOfQualifiedMeters = 0; - SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters( - pQInfo, vnodeId, pQueryFileInfo, pSupporter->pSidSet, pMeterDataInfo, &numOfQualifiedMeters); + SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet, + pMeterDataInfo, &numOfQualifiedMeters); dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters); if (pReqMeterDataInfo == NULL) { @@ -312,6 +311,11 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe continue; } + char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx); + if (pHeaderData == NULL) { // failed to mmap header file into buffer + continue; + } + uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo, pReqMeterDataInfo); @@ -500,7 +504,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { - resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]); + resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]); } #endif @@ -670,7 +674,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { - resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]); + resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]); } #endif -- GitLab