diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index acfd284737cadbe5232983958ef79546117d4ef8..68ae7c09eafe6c76e745e9c5333827294eb72ece 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -79,7 +79,7 @@ extern "C" { { \ if (FD_VALID(x)) { \ close(x); \ - x = -1; \ + x = FD_INITIALIZER; \ } \ } diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 2002483f03136fcc65d0bb1727ac169d12473db7..a26e9b6285af39156edf20d331e0afaaab4bf578 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -180,8 +180,8 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32 SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, __block_search_fn_t searchFn); -SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, int32_t fileIndex, - tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo, int32_t* numOfMeters); +int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo, + int32_t *numOfMeters, SMeterDataInfo ***pReqMeterDataInfo); int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMeters, @@ -196,11 +196,11 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj* pSupporter, int3 int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange); int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); -uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData, - int32_t numOfMeters, const char* filePath, SMeterDataInfo** pMeterDataInfo); +int32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters, + const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks); int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex); +int32_t vnodeGetHeaderFile(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); /** * Create SMeterQueryInfo. diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 8011595e4183915767e34d4fc9e3a98a2a36bcfb..0d749f60cc7ab3bd102ec812c1cc84ac42b65c11 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -100,12 +100,11 @@ typedef struct SQueryFilesInfo { uint32_t numOfFiles; // the total available number of files for this virtual node during query execution int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. int32_t vnodeId; - - int32_t headerFd; // header file fd - char* pHeaderFileData; // mmap header files - int64_t headFileSize; - int32_t dataFd; - int32_t lastFd; + + int32_t headerFd; // header file fd + int64_t headerFileSize; + int32_t dataFd; + int32_t lastFd; char headerFilePath[PATH_MAX]; // current opened header file name char dataFilePath[PATH_MAX]; // current opened data file name @@ -165,11 +164,10 @@ typedef struct SMeterDataInfo { uint64_t offsetInHeaderFile; int32_t numOfBlocks; int32_t start; // start block index - SCompBlock** pBlock; + SCompBlock* pBlock; int32_t meterOrderIdx; SMeterObj* pMeterObj; - int32_t groupIdx; // group id in meter list - + int32_t groupIdx; // group id in meter list SMeterQueryInfo* pMeterQInfo; } SMeterDataInfo; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index ec1e5f4b6faeacbf7be3521e635da69c8b5692b6..d33afaae586d4f00266a4bd4a8e4b8d502dfb6f4 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -107,9 +107,9 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) { static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *pMeterObj, SCompHeader *pCompHeader, SQueryFilesInfo *pQueryFileInfo, int32_t headerSize) { - if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headFileSize) { + if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headerFileSize) { dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid, size:%ld", pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headFileSize); + pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headerFileSize); return -1; } @@ -264,7 +264,7 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } -static void vnodeSetOpenedFileNames(SQueryFilesInfo *pVnodeFilesInfo) { +static void vnodeSetCurrentFileNames(SQueryFilesInfo *pVnodeFilesInfo) { assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles); SHeaderFileInfo *pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; @@ -308,26 +308,28 @@ static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSiz return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); } -static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo, int32_t vnodeId) { +static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo) { struct stat fstat = {0}; if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) { return true; } - pVnodeFilesInfo->headFileSize = fstat.st_size; - - return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize); + pVnodeFilesInfo->headerFileSize = fstat.st_size; + return isHeaderFileEmpty(pVnodeFilesInfo->vnodeId, pVnodeFilesInfo->headerFileSize); } static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { tclose(pVnodeFilesInfo->headerFd); tclose(pVnodeFilesInfo->dataFd); tclose(pVnodeFilesInfo->lastFd); + + pVnodeFilesInfo->current = -1; + pVnodeFilesInfo->headerFileSize = -1; } static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { pVnodeFilesInfo->current = -1; - pVnodeFilesInfo->headFileSize = -1; + pVnodeFilesInfo->headerFileSize = -1; pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value pVnodeFilesInfo->dataFd = FD_INITIALIZER; @@ -335,21 +337,20 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { } /* - * clean memory and other corresponding resources are delegated to invoker + * close the opened fd are delegated to invoker */ -static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo, int32_t vnodeId) { +static int32_t doOpenQueryFile(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo) { SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; /* * current header file is empty or broken, return directly. * - * if the header is smaller than a threshold value, this file is empty, no need to open these files - * the header file only to be opened, then the check of file size is available. Otherwise, the file may be - * replaced by new header file when opening the header file and then cause the miss check of file size + * if the header is smaller than or equals to the minimum file size value, this file is empty. No need to open this + * file and the corresponding files. */ - if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) { + if (checkIsHeaderFileEmpty(pVnodeFileInfo)) { qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, - pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize); + pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headerFileSize); return -1; } @@ -372,56 +373,30 @@ static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileIn return -1; } -// pVnodeFileInfo->pHeaderFileData = -// mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0); -// -// if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) { -// pVnodeFileInfo->pHeaderFileData = NULL; -// -// doCloseQueryFileInfoFD(pVnodeFileInfo); -// doInitQueryFileInfoFD(pVnodeFileInfo); -// -// dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath, -// pVnodeFileInfo->headFileSize, strerror(errno)); -// -// return -1; -// } else { -// if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) { -// dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); -// } -// } - return TSDB_CODE_SUCCESS; } -static void doUnmapHeaderFile(SQueryFilesInfo *pVnodeFileInfo) { - munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize); - pVnodeFileInfo->pHeaderFileData = NULL; - pVnodeFileInfo->headFileSize = -1; -} - -static void doCloseOpenedFileData(SQueryFilesInfo *pVnodeFileInfo) { +static void doCloseQueryFiles(SQueryFilesInfo *pVnodeFileInfo) { if (pVnodeFileInfo->current >= 0) { assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); - doUnmapHeaderFile(pVnodeFileInfo); + pVnodeFileInfo->headerFileSize = -1; + doCloseQueryFileInfoFD(pVnodeFileInfo); - doInitQueryFileInfoFD(pVnodeFileInfo); } assert(pVnodeFileInfo->current == -1); } /** - * mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to - * avoid too many memory mapped files at the save time to cause OS return the message of "Cannot allocate memory", - * during query processing. + * For each query, only one header file along with corresponding files is opened, in order to + * avoid too many memory files opened at the same time. * * @param pRuntimeEnv * @param fileIndex - * @return the return value may be null, so any invoker needs to check the returned value + * @return -1 failed, 0 success */ -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex) { +int32_t vnodeGetHeaderFile(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) { assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->vnodeFileInfo.numOfFiles); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -429,28 +404,28 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; - if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) { + if (pVnodeFileInfo->current != fileIndex) { if (pVnodeFileInfo->current >= 0) { -// assert(pVnodeFileInfo->pHeaderFileData != NULL); + assert(pVnodeFileInfo->headerFileSize > 0); } // do close the current memory mapped header file and corresponding fd - doCloseOpenedFileData(pVnodeFileInfo); - assert(pVnodeFileInfo->pHeaderFileData == NULL); + doCloseQueryFiles(pVnodeFileInfo); + assert(pVnodeFileInfo->headerFileSize == -1); // set current opened file Index pVnodeFileInfo->current = fileIndex; // set the current opened files(header, data, last) path - vnodeSetOpenedFileNames(pVnodeFileInfo); + vnodeSetCurrentFileNames(pVnodeFileInfo); - if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { - doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. - return pVnodeFileInfo->pHeaderFileData; + if (doOpenQueryFile(pQInfo, pVnodeFileInfo) != TSDB_CODE_SUCCESS) { + doCloseQueryFiles(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. + return -1; } } - return 1;//pVnodeFileInfo->pHeaderFileData; + return TSDB_CODE_SUCCESS; } /* @@ -477,38 +452,11 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim pSummary->readCompInfo++; pSummary->numOfSeek++; -#if 1 - char *data = vnodeGetHeaderFileData(pRuntimeEnv, pMeterObj->vnode, fileIndex); - if (data == NULL) { + int32_t ret = vnodeGetHeaderFile(pRuntimeEnv, fileIndex); + if (ret != TSDB_CODE_SUCCESS) { return -1; // failed to load the header file data into memory } - -#else - char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN); - read(fd, data, tmsize + TSDB_FILE_HEADER_LEN); -#endif - int64_t offset = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pMeterObj->sid; - -#if 0 - // check the offset value integrity - if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, data, - getCompHeaderSegSize(pCfg)) < 0) { - return -1; - } - - SCompHeader *compHeader = (SCompHeader *)(data + offset); - // no data in this file for specified meter, abort - if (compHeader->compInfoOffset == 0) { - return 0; - } - - // corrupted file may cause the invalid compInfoOffset, check needs - if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, - getCompHeaderStartPosition(pCfg)) < 0) { - return -1; - } -#else char* buf = calloc(1, getCompHeaderSegSize(pCfg)); SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; @@ -518,13 +466,10 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim // check the offset value integrity if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, buf - TSDB_FILE_HEADER_LEN, getCompHeaderSegSize(pCfg)) < 0) { + free(buf); return -1; } -#endif -#if 0 - SCompInfo *compInfo = (SCompInfo *)(data + compHeader->compInfoOffset); -#else SCompHeader *compHeader = (SCompHeader *)(buf + sizeof(SCompHeader) * pMeterObj->sid); // no data in this file for specified meter, abort @@ -533,33 +478,40 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim return 0; } + // corrupted file may cause the invalid compInfoOffset, check needs + if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, + getCompHeaderStartPosition(pCfg)) < 0) { + free(buf); + return -1; + } + lseek(pVnodeFileInfo->headerFd, compHeader->compInfoOffset, SEEK_SET); - SCompInfo CompInfo = {0}; - SCompInfo *compInfo = &CompInfo; - read(pVnodeFileInfo->headerFd, compInfo, sizeof(SCompInfo)); -#endif + SCompInfo compInfo = {0}; + read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo)); // check compblock info integrity - if (validateCompBlockInfoSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, compInfo, + if (validateCompBlockInfoSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, &compInfo, compHeader->compInfoOffset) < 0) { + free(buf); return -1; } - if (compInfo->numOfBlocks <= 0 || compInfo->uid != pMeterObj->uid) { + if (compInfo.numOfBlocks <= 0 || compInfo.uid != pMeterObj->uid) { + free(buf); return 0; } // free allocated SField data vnodeFreeFieldsEx(pRuntimeEnv); - pQuery->numOfBlocks = (int32_t)compInfo->numOfBlocks; + pQuery->numOfBlocks = (int32_t)compInfo.numOfBlocks; /* * +-------------+-----------+----------------+ * | comp block | checksum | SField Pointer | * +-------------+-----------+----------------+ */ - int32_t compBlockSize = compInfo->numOfBlocks * sizeof(SCompBlock); + int32_t compBlockSize = compInfo.numOfBlocks * sizeof(SCompBlock); size_t bufferSize = compBlockSize + sizeof(TSCKSUM) + POINTER_BYTES * pQuery->numOfBlocks; // prepare buffer to hold compblock data @@ -569,21 +521,16 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim } memset(pQuery->pBlock, 0, bufferSize); - -#if 0 - memcpy(pQuery->pBlock, (char *)compInfo + sizeof(SCompInfo), (size_t)compBlockSize); - TSCKSUM checksum = *(TSCKSUM *)((char *)compInfo + sizeof(SCompInfo) + compBlockSize); -#else + // read data: comp block + checksum read(pVnodeFileInfo->headerFd, pQuery->pBlock, compBlockSize + sizeof(TSCKSUM)); TSCKSUM checksum = *(TSCKSUM*)((char*)pQuery->pBlock + compBlockSize); -// read(pVnodeFileInfo->headerFd, &checksum, sizeof(TSCKSUM)); -#endif // check comp block integrity - if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, compInfo, (char *)pQuery->pBlock, + if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, &compInfo, (char *)pQuery->pBlock, pMeterObj->vnode, checksum) < 0) { - return -1; //TODO free resource in error process + free(buf); + return -1; } pQuery->pFields = (SField **)((char *)pQuery->pBlock + compBlockSize + sizeof(TSCKSUM)); @@ -2306,7 +2253,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->primaryColBuffer); } - doCloseOpenedFileData(&pRuntimeEnv->vnodeFileInfo); + doCloseQueryFiles(&pRuntimeEnv->vnodeFileInfo); if (pRuntimeEnv->vnodeFileInfo.pFileInfo != NULL) { pRuntimeEnv->vnodeFileInfo.numOfFiles = 0; @@ -3066,8 +3013,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p * * If failed to load comp block into memory due some how reasons, e.g., empty header file/not enough memory */ - int32_t numOfBlocks = vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIdx); - if (numOfBlocks <= 0) { + if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fileIdx) <= 0) { position->fileId = -1; return -1; } @@ -5676,8 +5622,8 @@ static int32_t offsetComparator(const void *pLeft, const void *pRight) { * @param pMeterHeadDataInfo * @return */ -SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t fileIndex, tSidSet *pSidSet, - SMeterDataInfo *pMeterDataInfo, int32_t *numOfMeters) { +int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet, SMeterDataInfo *pMeterDataInfo, + int32_t *numOfMeters, SMeterDataInfo ***pReqMeterDataInfo) { SQuery *pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; @@ -5686,22 +5632,35 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t SVnodeObj *pVnode = &vnodeList[vid]; - char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); - if (pHeaderFileData == NULL) { // failed to load header file into buffer - return 0; + char* buf = calloc(1, getCompHeaderSegSize(&pVnode->cfg)); + if (buf == NULL) { + *numOfMeters = 0; + return TSDB_CODE_SERV_OUT_OF_MEMORY; } - - int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); - - // file is corrupted, abort query in current file - if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < - 0) { + + SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; + + int32_t headerSize = getCompHeaderSegSize(&pVnode->cfg); + lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET); + read(pVnodeFileInfo->headerFd, buf, headerSize); + + // check the offset value integrity + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, buf - TSDB_FILE_HEADER_LEN, + headerSize) < 0) { + free(buf); *numOfMeters = 0; - return 0; + + return TSDB_CODE_FILE_CORRUPTED; } - int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg); - SMeterDataInfo **pReqMeterDataInfo = malloc(POINTER_BYTES * pSidSet->numOfSids); + int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg); + (*pReqMeterDataInfo) = malloc(POINTER_BYTES * pSidSet->numOfSids); + if (*pReqMeterDataInfo == NULL) { + free(buf); + *numOfMeters = 0; + + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } int32_t groupId = 0; TSKEY skey, ekey; @@ -5744,19 +5703,20 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t } } - int64_t headerOffset = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pMeterObj->sid; - - SCompHeader *compHeader = (SCompHeader *)(pHeaderFileData + headerOffset); - - if (compHeader->compInfoOffset == 0) { + int64_t headerOffset = sizeof(SCompHeader) * pMeterObj->sid; + SCompHeader *compHeader = (SCompHeader *)(buf + headerOffset); + if (compHeader->compInfoOffset == 0) { // current table is empty continue; } - - if (compHeader->compInfoOffset < sizeof(SCompHeader) * pVnode->cfg.maxSessions + TSDB_FILE_HEADER_LEN || - compHeader->compInfoOffset > pRuntimeEnv->vnodeFileInfo.headFileSize) { - dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid", pQuery, pMeterObj->vnode, pMeterObj->sid, - pMeterObj->meterId, compHeader->compInfoOffset); - continue; + + // corrupted file may cause the invalid compInfoOffset, check needs + int32_t compHeaderOffset = getCompHeaderStartPosition(&pVnode->cfg); + if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, compHeaderOffset) != + TSDB_CODE_SUCCESS) { + free(buf); + *numOfMeters = 0; + + return TSDB_CODE_FILE_CORRUPTED; } pOneMeterDataInfo->offsetInHeaderFile = (uint64_t)compHeader->compInfoOffset; @@ -5765,18 +5725,20 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t pOneMeterDataInfo->pMeterQInfo = createMeterQueryInfo(pQuery, pSupporter->rawSKey, pSupporter->rawEKey); } - pReqMeterDataInfo[*numOfMeters] = pOneMeterDataInfo; + (*pReqMeterDataInfo)[*numOfMeters] = pOneMeterDataInfo; (*numOfMeters) += 1; } assert(*numOfMeters <= pSidSet->numOfSids); - /* enable access sequentially */ + /* enable sequentially access*/ if (*numOfMeters > 1) { - qsort(pReqMeterDataInfo, *numOfMeters, POINTER_BYTES, offsetComparator); + qsort((*pReqMeterDataInfo), *numOfMeters, POINTER_BYTES, offsetComparator); } - return pReqMeterDataInfo; + free(buf); + + return TSDB_CODE_SUCCESS; } SMeterQueryInfo *createMeterQueryInfo(SQuery *pQuery, TSKEY skey, TSKEY ekey) { @@ -5893,10 +5855,11 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *p ((pQuery->lastKey <= pQuery->skey) && !QUERY_IS_ASC_QUERY(pQuery))); } -static void clearMeterDataBlockInfo(SMeterDataInfo *pMeterDataInfo) { - tfree(pMeterDataInfo->pBlock); - pMeterDataInfo->numOfBlocks = 0; - pMeterDataInfo->start = 0; +static void clearAllMeterDataBlockInfo(SMeterDataInfo** pMeterDataInfo, int32_t start, int32_t end) { + for(int32_t i = start; i < end; ++i) { + tfree(pMeterDataInfo[i]->pBlock); + pMeterDataInfo[i]->numOfBlocks = 0; + pMeterDataInfo[i]->start = 0; } } static bool getValidDataBlocksRangeIndex(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, SCompBlock *pCompBlock, @@ -5937,24 +5900,22 @@ static bool getValidDataBlocksRangeIndex(SMeterDataInfo *pMeterDataInfo, SQuery return true; } -static bool setValidDataBlocks(SMeterDataInfo *pMeterDataInfo, SCompBlock *pCompBlock, int32_t end) { +static bool setValidDataBlocks(SMeterDataInfo *pMeterDataInfo, int32_t end) { int32_t size = (end - pMeterDataInfo->start) + 1; assert(size > 0); if (size != pMeterDataInfo->numOfBlocks) { - char *tmp = realloc(pMeterDataInfo->pBlock, POINTER_BYTES * size); + memmove(pMeterDataInfo->pBlock, &pMeterDataInfo->pBlock[pMeterDataInfo->start], size * sizeof(SCompBlock)); + + char *tmp = realloc(pMeterDataInfo->pBlock, size * sizeof(SCompBlock)); if (tmp == NULL) { return false; } - pMeterDataInfo->pBlock = (SCompBlock **)tmp; + pMeterDataInfo->pBlock = (SCompBlock *)tmp; pMeterDataInfo->numOfBlocks = size; } - for (int32_t i = pMeterDataInfo->start, j = 0; i <= end; ++i, ++j) { - pMeterDataInfo->pBlock[j] = &pCompBlock[i]; - } - return true; } @@ -5984,49 +5945,63 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, } /** - * + * @param pSupporter * @param pQuery - * @param pHeaderData * @param numOfMeters + * @param filePath * @param pMeterDataInfo * @return */ -uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData, - int32_t numOfMeters, const char *filePath, SMeterDataInfo **pMeterDataInfo) { - uint32_t numOfBlocks = 0; +int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, int32_t numOfMeters, + const char *filePath, SMeterDataInfo **pMeterDataInfo, uint32_t *numOfBlocks) { SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; TSKEY minval, maxval; - + + *numOfBlocks = 0; + SQueryFilesInfo *pVnodeFileInfo = &pSupporter->runtimeEnv.vnodeFileInfo; + // sequentially scan this header file to extract the compHeader info for (int32_t j = 0; j < numOfMeters; ++j) { SMeterObj *pMeterObj = pMeterDataInfo[j]->pMeterObj; - SCompInfo *compInfo = (SCompInfo *)(pHeaderData + pMeterDataInfo[j]->offsetInHeaderFile); - int32_t ret = validateCompBlockInfoSegment(pQInfo, filePath, pMeterObj->vnode, compInfo, + lseek(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->offsetInHeaderFile, SEEK_SET); + + SCompInfo compInfo = {0}; + read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo)); + + int32_t ret = validateCompBlockInfoSegment(pQInfo, filePath, pMeterObj->vnode, &compInfo, pMeterDataInfo[j]->offsetInHeaderFile); - if (ret != 0) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); - continue; + if (ret != TSDB_CODE_SUCCESS) { // file corrupted + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); + return TSDB_CODE_FILE_CORRUPTED; } - if (compInfo->numOfBlocks <= 0 || compInfo->uid != pMeterDataInfo[j]->pMeterObj->uid) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); + if (compInfo.numOfBlocks <= 0 || compInfo.uid != pMeterDataInfo[j]->pMeterObj->uid) { + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); continue; } - - int32_t size = compInfo->numOfBlocks * sizeof(SCompBlock); - SCompBlock *pCompBlock = (SCompBlock *)((char *)compInfo + sizeof(SCompInfo)); + + int32_t size = compInfo.numOfBlocks * sizeof(SCompBlock); + size_t bufferSize = size + sizeof(TSCKSUM); + + pMeterDataInfo[j]->pBlock = calloc(1, bufferSize); + if (pMeterDataInfo[j]->pBlock == NULL) { + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + read(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->pBlock, bufferSize); + TSCKSUM checksum = *(TSCKSUM*)((char*)pMeterDataInfo[j]->pBlock + size); int64_t st = taosGetTimestampUs(); // check compblock integrity - TSCKSUM checksum = *(TSCKSUM *)((char *)compInfo + sizeof(SCompInfo) + size); - ret = validateCompBlockSegment(pQInfo, filePath, compInfo, (char *)pCompBlock, pMeterObj->vnode, checksum); - if (ret < 0) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); - continue; + ret = validateCompBlockSegment(pQInfo, filePath, &compInfo, (char*) pMeterDataInfo[j]->pBlock, pMeterObj->vnode, checksum); + if (ret != TSDB_CODE_SUCCESS) { + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); + return TSDB_CODE_FILE_CORRUPTED; } int64_t et = taosGetTimestampUs(); @@ -6036,31 +6011,31 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer pSummary->loadCompInfoUs += (et - st); if (!setCurrentQueryRange(pMeterDataInfo[j], pQuery, pSupporter->rawEKey, &minval, &maxval)) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); continue; } int32_t end = 0; - if (!getValidDataBlocksRangeIndex(pMeterDataInfo[j], pQuery, pCompBlock, compInfo->numOfBlocks, minval, maxval, - &end)) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); + if (!getValidDataBlocksRangeIndex(pMeterDataInfo[j], pQuery, pMeterDataInfo[j]->pBlock, compInfo.numOfBlocks, + minval, maxval, &end)) { + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); continue; } - if (!setValidDataBlocks(pMeterDataInfo[j], pCompBlock, end)) { - clearMeterDataBlockInfo(pMeterDataInfo[j]); - pQInfo->killed = 1; // todo set query kill, abort current query since no - // memory available - return 0; + if (!setValidDataBlocks(pMeterDataInfo[j], end)) { + clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); + + pQInfo->killed = 1; // set query kill, abort current query since no memory available + return TSDB_CODE_SERV_OUT_OF_MEMORY; } qTrace("QInfo:%p vid:%d sid:%d id:%s, startIndex:%d, %d blocks qualified", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterDataInfo[j]->start, pMeterDataInfo[j]->numOfBlocks); - numOfBlocks += pMeterDataInfo[j]->numOfBlocks; + (*numOfBlocks) += pMeterDataInfo[j]->numOfBlocks; } - return numOfBlocks; + return TSDB_CODE_SUCCESS; } static void freeDataBlockFieldInfo(SMeterDataBlockInfoEx *pDataBlockInfoEx, int32_t len) { @@ -6111,6 +6086,17 @@ static int32_t blockAccessOrderComparator(const void *pLeft, const void *pRight, return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1; } +void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) { + tfree(pSupporter->numOfBlocksPerMeter); + tfree(pSupporter->blockIndexArray); + + for (int32_t i = 0; i < numOfTables; ++i) { + tfree(pSupporter->pDataBlockInfoEx[i]); + } + + tfree(pSupporter->pDataBlockInfoEx); +} + int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMeters, SMeterDataBlockInfoEx **pDataBlockInfoEx, int32_t numOfCompBlocks, int32_t *nAllocBlocksInfoSize, int64_t addr) { @@ -6121,11 +6107,10 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet char *tmp = realloc((*pDataBlockInfoEx), sizeof(SMeterDataBlockInfoEx) * numOfCompBlocks); if (tmp == NULL) { tfree(*pDataBlockInfoEx); - return -1; - } else { - *pDataBlockInfoEx = (SMeterDataBlockInfoEx *)tmp; + return TSDB_CODE_SERV_OUT_OF_MEMORY; } + *pDataBlockInfoEx = (SMeterDataBlockInfoEx *)tmp; memset((*pDataBlockInfoEx), 0, sizeof(SMeterDataBlockInfoEx) * numOfCompBlocks); *nAllocBlocksInfoSize = numOfCompBlocks; } @@ -6138,10 +6123,8 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet if (supporter.numOfBlocksPerMeter == NULL || supporter.blockIndexArray == NULL || supporter.pDataBlockInfoEx == NULL) { - tfree(supporter.numOfBlocksPerMeter); - tfree(supporter.blockIndexArray); - tfree(supporter.pDataBlockInfoEx); - return -1; + cleanBlockOrderSupporter(&supporter, 0); + return TSDB_CODE_SERV_OUT_OF_MEMORY; } int32_t cnt = 0; @@ -6151,17 +6134,21 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet continue; } - SCompBlock **pBlock = pMeterDataInfo[j]->pBlock; + SCompBlock *pBlock = pMeterDataInfo[j]->pBlock; supporter.numOfBlocksPerMeter[numOfQualMeters] = pMeterDataInfo[j]->numOfBlocks; - // TODO handle failed to allocate memory - supporter.pDataBlockInfoEx[numOfQualMeters] = - calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks); + char* buf = calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks); + if (buf == NULL) { + cleanBlockOrderSupporter(&supporter, numOfQualMeters); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx*) buf; for (int32_t k = 0; k < pMeterDataInfo[j]->numOfBlocks; ++k) { SMeterDataBlockInfoEx *pInfoEx = &supporter.pDataBlockInfoEx[numOfQualMeters][k]; - pInfoEx->pBlock.compBlock = pBlock[k]; + pInfoEx->pBlock.compBlock = &pBlock[k]; pInfoEx->pBlock.fields = NULL; pInfoEx->pMeterDataInfo = pMeterDataInfo[j]; @@ -6175,12 +6162,15 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet dTrace("QInfo %p create data blocks info struct completed", addr); - assert(cnt <= numOfCompBlocks && numOfQualMeters <= numOfMeters); + assert(cnt == numOfCompBlocks && numOfQualMeters <= numOfMeters); // the pMeterDataInfo[j]->numOfBlocks may be 0 supporter.numOfMeters = numOfQualMeters; SLoserTreeInfo *pTree = NULL; uint8_t ret = tLoserTreeCreate(&pTree, supporter.numOfMeters, &supporter, blockAccessOrderComparator); - UNUSED(ret); + if (ret != TSDB_CODE_SUCCESS) { + cleanBlockOrderSupporter(&supporter, numOfMeters); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } int32_t numOfTotal = 0; @@ -6191,10 +6181,11 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet (*pDataBlockInfoEx)[numOfTotal++] = pBlocksInfoEx[index]; + // set data block index overflow, in order to disable the offset comparator if (supporter.blockIndexArray[pos] >= supporter.numOfBlocksPerMeter[pos]) { - /* set data block index overflow, in order to disable the offset comparator */ supporter.blockIndexArray[pos] = supporter.numOfBlocksPerMeter[pos] + 1; } + tLoserTreeAdjust(pTree, pos + supporter.numOfMeters); } @@ -6206,18 +6197,10 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet */ dTrace("QInfo %p %d data blocks sort completed", addr, cnt); - - tfree(supporter.numOfBlocksPerMeter); - tfree(supporter.blockIndexArray); - - for (int32_t i = 0; i < numOfMeters; ++i) { - tfree(supporter.pDataBlockInfoEx[i]); - } - - tfree(supporter.pDataBlockInfoEx); + cleanBlockOrderSupporter(&supporter, numOfMeters); free(pTree); - return cnt; + return TSDB_CODE_SUCCESS; } /** diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index a545645a0967d5f2f12ebfe412cd884cc8fb24f2..c69b27537e82fc7a58aff764276bc8d932f18a1c 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -295,8 +295,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe pQuery->fileId = fid; pSummary->numOfFiles++; - char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx); - if (pHeaderFileData == NULL) { // failed to mmap header file into buffer, ignore current file, try next + if (vnodeGetHeaderFile(pRuntimeEnv, fileIdx) != TSDB_CODE_SUCCESS) { fid += step; continue; } @@ -304,14 +303,16 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe int32_t numOfQualifiedMeters = 0; assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); - SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet, - pMeterDataInfo, &numOfQualifiedMeters); - - if (pReqMeterDataInfo == NULL) { - dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); - - pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + SMeterDataInfo **pReqMeterDataInfo = NULL; + int32_t ret = vnodeFilterQualifiedMeters(pQInfo, vnodeId, pSupporter->pSidSet, pMeterDataInfo, + &numOfQualifiedMeters, &pReqMeterDataInfo); + if (ret != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to create meterdata struct to perform query processing, abort", pQInfo); + + tfree(pReqMeterDataInfo); + pQInfo->code = -ret; pQInfo->killed = 1; + return NULL; } @@ -324,8 +325,18 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe continue; } - uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderFileData, numOfQualifiedMeters, - pVnodeFileInfo->headerFilePath, pReqMeterDataInfo); + uint32_t numOfBlocks = 0; + ret = getDataBlocksForMeters(pSupporter, pQuery, numOfQualifiedMeters, pVnodeFileInfo->headerFilePath, + pReqMeterDataInfo, &numOfBlocks); + if (ret != TSDB_CODE_SUCCESS) { + dError("QInfo:%p failed to get data block before scan data blocks, abort", pQInfo); + + tfree(pReqMeterDataInfo); + pQInfo->code = -ret; + pQInfo->killed = 1; + + return NULL; + } dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters, numOfBlocks); @@ -336,13 +347,13 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe continue; } - int32_t n = createDataBlocksInfoEx(pReqMeterDataInfo, numOfQualifiedMeters, &pDataBlockInfoEx, numOfBlocks, + ret = createDataBlocksInfoEx(pReqMeterDataInfo, numOfQualifiedMeters, &pDataBlockInfoEx, numOfBlocks, &nAllocBlocksInfoSize, (int64_t)pQInfo); - if (n < 0) { // failed to create data blocks - dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); + if (ret != TSDB_CODE_SUCCESS) { // failed to create data blocks + dError("QInfo:%p build blockInfoEx failed, abort", pQInfo); tfree(pReqMeterDataInfo); - pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + pQInfo->code = -ret; pQInfo->killed = 1; return NULL; } @@ -397,7 +408,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe setExecutionContext(pSupporter, pSupporter->pResult, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx, pMeterQueryInfo); } else { // interval query - int32_t ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); + ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo); if (ret != TSDB_CODE_SUCCESS) { tfree(pReqMeterDataInfo); // error code has been set pQInfo->killed = 1; diff --git a/src/util/src/tlosertree.c b/src/util/src/tlosertree.c index 7da03347b10cc7f6ff02df712f892a4289ffc1f1..98d5e1cf352540998477e133436a52bc97e047bd 100644 --- a/src/util/src/tlosertree.c +++ b/src/util/src/tlosertree.c @@ -13,10 +13,7 @@ * along with this program. If not, see . */ -#include -#include -#include - +#include "os.h" #include "taosmsg.h" #include "tlog.h" #include "tlosertree.h"