提交 15b467ab 编写于 作者: H hjxilinx

[tbase-1007]

上级 96ae0ca2
......@@ -53,7 +53,6 @@ typedef struct SQueryLoadCompBlockInfo {
*/
typedef struct SHeaderFileInfo {
int32_t fileID; // file id
size_t headFileSize; // header file size
} SHeaderFileInfo;
typedef struct SQueryCostSummary {
......@@ -103,9 +102,10 @@ typedef struct SQueryFilesInfo {
int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files
int64_t headFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
......
......@@ -160,7 +160,7 @@ int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
SMeterObj *pMeterObj = vnodeList[vid].meterList[sid];
if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) {
dError("vid:%d sid:%d, no active table", vid, sid);
dError("vid:%d sid:%d, not active table", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _over;
}
......
......@@ -99,7 +99,7 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) {
}
static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *pMeterObj, SCompHeader *pCompHeader,
SHeaderFileInfo *pQueryFileInfo, int32_t headerSize) {
SQueryFilesInfo *pQueryFileInfo, int32_t headerSize) {
if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headFileSize) {
dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid, size:%ld", pQInfo, pMeterObj->vnode,
pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headFileSize);
......@@ -111,7 +111,7 @@ static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *p
}
// check compinfo integrity
static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, char *filePath, int32_t vid,
static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const char *filePath, int32_t vid,
SCompInfo *compInfo, int64_t offset) {
if (!taosCheckChecksumWhole((uint8_t *)compInfo, sizeof(SCompInfo))) {
dLError("QInfo:%p vid:%d, failed to read header file:%s, file compInfo broken, offset:%lld", pQInfo, vid, filePath,
......@@ -121,7 +121,7 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, char *f
return 0;
}
static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, char *filePath, SCompInfo *compInfo, char *pBlock,
static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, char *pBlock,
int32_t vid, TSCKSUM checksum) {
uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock);
......@@ -247,6 +247,22 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->fileListIndex = -1;
}
static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) {
assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles);
SHeaderFileInfo* pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current];
// set the full file path for current opened files
snprintf(pVnodeFilesInfo->headerFilePath, PATH_MAX, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix,
pVnodeFilesInfo->vnodeId, pCurrentFileInfo->fileID);
snprintf(pVnodeFilesInfo->dataFilePath, PATH_MAX, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix,
pVnodeFilesInfo->vnodeId, pCurrentFileInfo->fileID);
snprintf(pVnodeFilesInfo->lastFilePath, PATH_MAX, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix,
pVnodeFilesInfo->vnodeId, pCurrentFileInfo->fileID);
}
/**
* if the header is smaller than a threshold value(header size + initial offset value)
*
......@@ -254,11 +270,22 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
* @param headerFileSize
* @return
*/
static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg;
return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg);
}
static bool checkIsHeaderFileEmpty(SQueryFilesInfo* pVnodeFilesInfo, int32_t vnodeId) {
struct stat fstat = {0};
if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) {
return true;
}
pVnodeFilesInfo->headFileSize = fstat.st_size;
return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize);
}
static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
tclose(pVnodeFilesInfo->headerFd);
tclose(pVnodeFilesInfo->dataFd);
......@@ -267,59 +294,66 @@ static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) {
pVnodeFilesInfo->current = -1;
pVnodeFilesInfo->headFileSize = -1;
pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value
pVnodeFilesInfo->dataFd = FD_INITIALIZER;
pVnodeFilesInfo->lastFd = FD_INITIALIZER;
}
static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFiles) {
// if the header is smaller than a threshold value, this file is empty, no need to
SHeaderFileInfo* pCurrentFileInfo = &pVnodeFiles->pFileInfo[pVnodeFiles->current];
// set the full file path for current opened files
snprintf(pVnodeFiles->headerFilePath, PATH_MAX, "%sv%df%d.head", pVnodeFiles->dbFilePathPrefix,
pVnodeFiles->vnodeId, pCurrentFileInfo->fileID);
/*
* clean memory and other corresponding resources are delegated to invoker
*/
static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileInfo, int32_t vnodeId) {
SHeaderFileInfo* pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->headerFd)) {
dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno));
return -1;
}
snprintf(pVnodeFiles->dataFilePath, PATH_MAX, "%sv%df%d.data", pVnodeFiles->dbFilePathPrefix,
pVnodeFiles->vnodeId, pCurrentFileInfo->fileID);
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->dataFd)) {
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno));
/*
* current header file is empty or broken, return directly.
*
* if the header is smaller than a threshold value, this file is empty, no need to open these files
* the header file only to be opened, then the check of file size is available. Otherwise, the file may be
* replaced by new header file when opening the header file and then cause the miss check of file size
*/
if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) {
qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo,
pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize);
return -1;
}
snprintf(pVnodeFiles->lastFilePath, PATH_MAX, "%sv%df%d.last", pVnodeFiles->dbFilePathPrefix,
pVnodeFiles->vnodeId, pCurrentFileInfo->fileID);
pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->dataFd)) {
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno));
return -1;
}
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->lastFd)) {
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno));
pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFileInfo->lastFd)) {
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno));
return -1;
}
pVnodeFiles->pHeaderFileData = mmap(NULL, pCurrentFileInfo->headFileSize, PROT_READ, MAP_SHARED,
pVnodeFiles->headerFd, 0);
pVnodeFileInfo->pHeaderFileData = mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED,
pVnodeFileInfo->headerFd, 0);
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
pVnodeFiles->pHeaderFileData = NULL;
if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) {
pVnodeFileInfo->pHeaderFileData = NULL;
doCloseQueryFileInfoFD(pVnodeFiles);
doInitQueryFileInfoFD(pVnodeFiles);
doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo);
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath,
pCurrentFileInfo->headFileSize, strerror(errno));
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath,
pVnodeFileInfo->headFileSize, strerror(errno));
return -1;
} else {
if (madvise(pVnodeFiles->pHeaderFileData, pCurrentFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
}
}
......@@ -327,17 +361,18 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFiles)
return TSDB_CODE_SUCCESS;
}
static void doUnmapHeaderFile(SQueryFilesInfo* pVnodeFileInfo) {
munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize);
pVnodeFileInfo->pHeaderFileData = NULL;
pVnodeFileInfo->headFileSize = -1;
}
static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) {
if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0 &&
pVnodeFileInfo->pHeaderFileData != NULL);
SHeaderFileInfo *pCurrentFile = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current];
munmap(pVnodeFileInfo->pHeaderFileData, pCurrentFile->headFileSize);
pVnodeFileInfo->pHeaderFileData = NULL;
assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0);
doUnmapHeaderFile(pVnodeFileInfo);
doCloseQueryFileInfoFD(pVnodeFileInfo);
doInitQueryFileInfoFD(pVnodeFileInfo);
}
......@@ -361,28 +396,23 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[fileIndex];
if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) {
if (pVnodeFileInfo->current >= 0) {
assert(pVnodeFileInfo->pHeaderFileData != NULL);
}
doCloseOpenedFileData(pVnodeFileInfo); // do close the other memory mapped header file
// do close the current memory mapped header file and corresponding fd
doCloseOpenedFileData(pVnodeFileInfo);
assert(pVnodeFileInfo->pHeaderFileData == NULL);
// current header file is empty or broken, return directly
if (isHeaderFileEmpty(vnodeId, pHeaderFileInfo->headFileSize)) {
qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo,
pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, fileIndex, pHeaderFileInfo->headFileSize);
return pVnodeFileInfo->pHeaderFileData;
}
// set current opened file Index
pVnodeFileInfo->current = fileIndex;
if (doOpenQueryFileData(pQInfo, pVnodeFileInfo) != TSDB_CODE_SUCCESS) {
// set the current opened files(header, data, last) path
vnodeSetOpenedFileNames(pVnodeFileInfo);
if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) {
doCloseOpenedFileData(pVnodeFileInfo); // there may be partially open fd, close it anyway.
return pVnodeFileInfo->pHeaderFileData;
}
......@@ -400,13 +430,13 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg;
SHeaderFileInfo *pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex];
SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex];
int64_t st = taosGetTimestampUs();
if (vnodeIsCompBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIndex)) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d compBlock info is loaded, not reload", GET_QINFO_ADDR(pQuery),
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQueryFileInfo->fileID);
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pHeadeFileInfo->fileID);
return pQuery->numOfBlocks;
}
......@@ -441,7 +471,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
}
// corrupted file may cause the invalid compInfoOffset, check needs
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, pQueryFileInfo, getCompHeaderStartPosition(pCfg)) < 0) {
if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) {
return -1;
}
......@@ -3002,57 +3032,12 @@ static int file_order_comparator(const void *p1, const void *p2) {
/**
* open a data files and header file for metric meta query
*
* @param pQInfo
* @param pVnodeFiles
* @param fid
* @param vnodeId
* @param fileName
* @param prefix
* @return
* @param index
*/
static int32_t vnodeOpenVnodeDBFiles(SQueryFilesInfo *pVnodeFiles, int32_t fid, int32_t index, char *fileName) {
SHeaderFileInfo *pFileInfo = &pVnodeFiles->pFileInfo[index];
pFileInfo->fileID = fid;
char buf[PATH_MAX] = {0};
snprintf(buf, PATH_MAX, "%s%s", pVnodeFiles->dbFilePathPrefix, fileName);
struct stat fstat = {0};
if (stat(buf, &fstat) < 0) return -1;
pFileInfo->headFileSize = fstat.st_size;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
/* enforce kernel to preload data when the file is mapping */
pVnodeFiles->pDataFileData = mmap(NULL, pVnodeFiles->defaultMappingSize, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
pVnodeFiles->dataFd, pVnodeFiles->dtFileMappingOffset);
if (pVnodeFiles->pDataFileData == MAP_FAILED) {
dError("QInfo:%p failed to map data file:%s, %s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno));
goto _clean;
}
/* advise kernel the usage of mmaped data */
if (madvise(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of data file:%s, reason:%s", pQInfo, pVnodeFiles->dataFilePath,
strerror(errno));
}
#endif
return TSDB_CODE_SUCCESS;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
_clean:
if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize);
pVnodeFiles->pDataFileData = NULL;
}
tclose(pVnodeFiles->headerFd);
tclose(pVnodeFiles->dataFd);
tclose(pVnodeFiles->lastFd);
return -1;
#endif
static FORCE_INLINE void vnodeStoreFileId(SQueryFilesInfo *pVnodeFiles, int32_t fid, int32_t index) {
pVnodeFiles->pFileInfo[index].fileID = fid;
}
static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
......@@ -3111,13 +3096,8 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
memset(&pVnodeFilesInfo->pFileInfo[alloc >> 1U], 0, (alloc >> 1U) * sizeof(SHeaderFileInfo));
}
SHeaderFileInfo *pVnodeFiles = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->numOfFiles - 1];
int32_t ret = vnodeOpenVnodeDBFiles(pVnodeFilesInfo, fid, pVnodeFilesInfo->numOfFiles - 1, pEntry->d_name);
if (ret < 0) {
memset(pVnodeFiles, 0, sizeof(SHeaderFileInfo)); // reset information
pVnodeFilesInfo->numOfFiles -= 1;
}
int32_t index = pVnodeFilesInfo->numOfFiles - 1;
vnodeStoreFileId(pVnodeFilesInfo, fid, index);
}
closedir(pDir);
......@@ -5557,8 +5537,6 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SHeaderFileInfo * pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex];
SVnodeObj *pVnode = &vnodeList[vid];
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
......@@ -5627,7 +5605,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
}
if (compHeader->compInfoOffset < sizeof(SCompHeader) * pVnode->cfg.maxSessions + TSDB_FILE_HEADER_LEN ||
compHeader->compInfoOffset > pQueryFileInfo->headFileSize) {
compHeader->compInfoOffset > pRuntimeEnv->vnodeFileInfo.headFileSize) {
dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid", pQuery, pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, compHeader->compInfoOffset);
continue;
......@@ -6563,7 +6541,6 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
SHeaderFileInfo *pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIdx];
TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
......
......@@ -297,7 +297,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
continue;
}
int32_t numOfQualifiedMeters = 0;
int32_t numOfQualifiedMeters = 0;
assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current);
SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet,
pMeterDataInfo, &numOfQualifiedMeters);
......
......@@ -551,7 +551,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active table", vnode, sid);
dError("vid:%d sid:%d, not active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册