diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 227095fc033837000a3ceade0e0f1cea728523e4..62ef1adfa431c2c0a3fc3fb54e526262b57e6790 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } else { \ *(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \ *(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList + (TSDB_KEYSIZE)*index); \ + *(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList[index]); \ } \ } while (0); diff --git a/src/inc/tutil.h b/src/inc/tutil.h index 683927c816d5284ac5b30af851e4e1cde28b5f25..bdf9df63eeafd73f892be7ab50dddb35ed9989c0 100644 --- a/src/inc/tutil.h +++ b/src/inc/tutil.h @@ -26,12 +26,14 @@ extern "C" { #include "tsdb.h" #ifndef STDERR_FILENO - #define VALIDFD(x) ((x) > 2) -#else - #define VALIDFD(x) ((x) > STDERR_FILENO) +#define STDERR_FILENO (2) #endif +#define FD_VALID(x) ((x) > STDERR_FILENO) +#define FD_INITIALIZER ((int32_t)-1) + #define WCHAR wchar_t + #define tfree(x) \ { \ if (x) { \ diff --git a/src/os/darwin/inc/os.h b/src/os/darwin/inc/os.h index eabd5cd221ba2f72a2082273b7afc17825e40695..bf86103e8400725b991a9716de25b2018ec5a61d 100644 --- a/src/os/darwin/inc/os.h +++ b/src/os/darwin/inc/os.h @@ -47,7 +47,7 @@ #define taosCloseSocket(x) \ { \ - if (VALIDFD(x)) { \ + if (FD_VALID(x)) { \ close(x); \ x = -1; \ } \ diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index e9f1737cd4e9148ca948ee32c91ef9164cc26ba5..a3d50400c30874fd48cd3dc3d005682a23c58ea1 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -75,11 +75,12 @@ extern "C" { #define taosCloseSocket(x) \ { \ - if (VALIDFD(x)) { \ + if (FD_VALID(x)) { \ close(x); \ x = -1; \ } \ } + #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len) diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 57c4149f224b394838d66c683e528e392dc49ea4..9e9101bb132cf8655cbf7bc79c4b65889a52e506 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -192,9 +192,9 @@ int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInf uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData, int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo); -int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, +int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex); /** * Create SMeterQueryInfo. diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index b9bf684f799d08b540df3113ba2d67ecb41a7b40..bc17eb569596dc09e815eb1b7ae586a73008b7e2 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -52,9 +52,9 @@ typedef struct SQueryLoadCompBlockInfo { */ typedef struct SQueryFileInfo { int32_t fileID; /* file id */ - char headerFilePath[256]; /* full file name */ - char dataFilePath[256]; - char lastFilePath[256]; + char headerFilePath[PATH_MAX]; /* full file name */ + char dataFilePath[PATH_MAX]; + char lastFilePath[PATH_MAX]; int32_t defaultMappingSize; /* default mapping size */ int32_t headerFd; /* file handler */ diff --git a/src/system/detail/src/vnodeCommit.c b/src/system/detail/src/vnodeCommit.c index 70c4cfe280c619431193dcd91bf33b1a0a917f6c..57bb52eb23d1a2d2604712709965c3b7bdd21f17 100644 --- a/src/system/detail/src/vnodeCommit.c +++ b/src/system/detail/src/vnodeCommit.c @@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) { pthread_mutex_lock(&(pVnode->logMutex)); - if (VALIDFD(pVnode->logFd)) { + if (FD_VALID(pVnode->logFd)) { munmap(pVnode->pMem, pVnode->mappingSize); close(pVnode->logFd); rename(fileName, oldName); @@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) { void vnodeCleanUpCommit(int vnode) { SVnodeObj *pVnode = vnodeList + vnode; - if (VALIDFD(pVnode->logFd)) close(pVnode->logFd); + if (FD_VALID(pVnode->logFd)) close(pVnode->logFd); if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) { dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 2d6332b05b0f282cb25ff2c88b3c2484e32e2c73..dfef6862d01579ca6d2aa89ca376bbd8444b0810 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -247,6 +247,47 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } +/** + * if the header is smaller than a threshold value(header size + initial offset value) + * + * @param vnodeId + * @param headerFileSize + * @return + */ +static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) { + SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg; + return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); +} + +static void doCloseQueryFileInfoFD(SQueryFileInfo* pVnodeFiles) { + tclose(pVnodeFiles->headerFd); + tclose(pVnodeFiles->dataFd); + tclose(pVnodeFiles->lastFd); +} + +static int32_t doOpenQueryFileInfoDF(SQInfo* pQInfo, SQueryFileInfo* pVnodeFiles) { + // if the header is smaller than a threshold value, this file is empty, no need to + pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->headerFd)) { + dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); + return -1; + } + + pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->headerFd)) { + dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno)); + return -1; + } + + pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->headerFd)) { + dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno)); + return -1; + } + + return TSDB_CODE_SUCCESS; +} + static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) { if (pRuntimeEnv->mmapedHFileIndex >= 0) { assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0); @@ -255,6 +296,8 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) { munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize); otherVnodeFiles->pHeaderFileData = NULL; + doCloseQueryFileInfoFD(otherVnodeFiles); + pRuntimeEnv->mmapedHFileIndex = -1; } @@ -263,34 +306,46 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) { /** * mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to - * avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory", + * avoid too many memory mapped files at the save time to cause OS return the message of "Cannot allocate memory", * during query processing. * * @param pRuntimeEnv * @param fileIndex * @return the return value may be null, so any invoker needs to check the returned value */ -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) { +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex) { assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles); SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex]; - size_t size = pVnodeFiles->headFileSize; - + if (pVnodeFiles->pHeaderFileData == NULL) { assert(pRuntimeEnv->mmapedHFileIndex != fileIndex); - doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file + doUnmapHeaderFileData(pRuntimeEnv); // do close the other memory mapped header file - pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0); + assert(pVnodeFiles->pHeaderFileData == NULL); + + // current header file is empty or broken, return directly + if (isHeaderFileEmpty(vnodeId, pVnodeFiles->headFileSize)) { + return pVnodeFiles->pHeaderFileData; + } + + if (doOpenQueryFileInfoDF(pQInfo, pVnodeFiles) != TSDB_CODE_SUCCESS) { + return pVnodeFiles->pHeaderFileData; + } + + pVnodeFiles->pHeaderFileData = mmap(NULL, pVnodeFiles->headFileSize, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0); if (pVnodeFiles->pHeaderFileData == MAP_FAILED) { pVnodeFiles->pHeaderFileData = NULL; - dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size, - strerror(errno)); + doCloseQueryFileInfoFD(pVnodeFiles); + + dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, + pVnodeFiles->headFileSize, strerror(errno)); } else { pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file - if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) { + if (madvise(pVnodeFiles->pHeaderFileData, pVnodeFiles->headFileSize, MADV_SEQUENTIAL) == -1) { dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); } } @@ -326,7 +381,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim pSummary->numOfSeek++; #if 1 - char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); + char *data = vnodeGetHeaderFileData(pRuntimeEnv, pMeterObj->vnode, fileIndex); if (data == NULL) { return -1; // failed to load the header file data into memory } @@ -2928,34 +2983,24 @@ static int file_order_comparator(const void *p1, const void *p2) { * @param prefix * @return */ -static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, - char *fileName, char *prefix) { +static int32_t vnodeOpenVnodeDBFiles(SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, char *fileName, + char *prefix) { + pVnodeFiles->fileID = fid; pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE; - snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName); - pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); - - if (!VALIDFD(pVnodeFiles->headerFd)) { - dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); - goto _clean; - } + snprintf(pVnodeFiles->headerFilePath, PATH_MAX, "%s%s", prefix, fileName); struct stat fstat = {0}; if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1; pVnodeFiles->headFileSize = fstat.st_size; - snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid); - snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid); - - pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); - pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); - -// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; -// pVnodeFiles->dataFileSize = fstat.st_size; -// -// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1; -// pVnodeFiles->lastFileSize = fstat.st_size; + snprintf(pVnodeFiles->dataFilePath, PATH_MAX, "%sv%df%d.data", prefix, vnodeId, fid); + snprintf(pVnodeFiles->lastFilePath, PATH_MAX, "%sv%df%d.last", prefix, vnodeId, fid); + + pVnodeFiles->headerFd = FD_INITIALIZER; + pVnodeFiles->dataFd = FD_INITIALIZER; + pVnodeFiles->lastFd = FD_INITIALIZER; #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP /* enforce kernel to preload data when the file is mapping */ @@ -2975,19 +3020,19 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles return TSDB_CODE_SUCCESS; -_clean: - #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP +_clean: if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) { munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize); pVnodeFiles->pDataFileData = NULL; } -#endif - + tclose(pVnodeFiles->headerFd); tclose(pVnodeFiles->dataFd); tclose(pVnodeFiles->lastFd); return -1; + +#endif } static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { @@ -3047,7 +3092,7 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { } SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1]; - int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix); + int32_t ret = vnodeOpenVnodeDBFiles(pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix); if (ret < 0) { memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information pRuntimeEnv->numOfFiles -= 1; @@ -3767,7 +3812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } } - if (VALIDFD(pSupporter->meterOutputFd)) { + if (FD_VALID(pSupporter->meterOutputFd)) { assert(pSupporter->meterOutputMMapBuf != NULL); dTrace("QInfo:%p disk-based output buffer during query:%lld bytes", pQInfo, pSupporter->bufSize); munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); @@ -3868,7 +3913,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile); pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666); - if (!VALIDFD(pSupporter->meterOutputFd)) { + if (!FD_VALID(pSupporter->meterOutputFd)) { dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -5492,7 +5537,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t SVnodeObj *pVnode = &vnodeList[vid]; - char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); + char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); if (pHeaderFileData == NULL) { // failed to load header file into buffer return 0; } @@ -6491,7 +6536,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) { return loadPrimaryTS; } -int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, +int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; @@ -6979,7 +7024,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) { int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); // make sure file exist - if (VALIDFD(fd)) { + if (FD_VALID(fd)) { size_t s = lseek(fd, 0, SEEK_END); dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 4703f7f8dbb7706bd0a4660a179476d1831c2167..b1398f7a4a0223c43f4b4848daeccfb65f61af50 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -290,8 +290,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe pSummary->numOfFiles++; SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; - char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx); + char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx); if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next + fid += step; continue; }