提交 19ceadd6 编写于 作者: H hjxilinx

[jira none]

上级 4466964a
......@@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else { \
*(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \
*(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList + (TSDB_KEYSIZE)*index); \
*(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList[index]); \
} \
} while (0);
......
......@@ -26,12 +26,14 @@ extern "C" {
#include "tsdb.h"
#ifndef STDERR_FILENO
#define VALIDFD(x) ((x) > 2)
#else
#define VALIDFD(x) ((x) > STDERR_FILENO)
#define STDERR_FILENO (2)
#endif
#define FD_VALID(x) ((x) > STDERR_FILENO)
#define FD_INITIALIZER ((int32_t)-1)
#define WCHAR wchar_t
#define tfree(x) \
{ \
if (x) { \
......
......@@ -47,7 +47,7 @@
#define taosCloseSocket(x) \
{ \
if (VALIDFD(x)) { \
if (FD_VALID(x)) { \
close(x); \
x = -1; \
} \
......
......@@ -75,11 +75,12 @@ extern "C" {
#define taosCloseSocket(x) \
{ \
if (VALIDFD(x)) { \
if (FD_VALID(x)) { \
close(x); \
x = -1; \
} \
}
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
......
......@@ -192,9 +192,9 @@ int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInf
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData,
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex);
/**
* Create SMeterQueryInfo.
......
......@@ -52,9 +52,9 @@ typedef struct SQueryLoadCompBlockInfo {
*/
typedef struct SQueryFileInfo {
int32_t fileID; /* file id */
char headerFilePath[256]; /* full file name */
char dataFilePath[256];
char lastFilePath[256];
char headerFilePath[PATH_MAX]; /* full file name */
char dataFilePath[PATH_MAX];
char lastFilePath[PATH_MAX];
int32_t defaultMappingSize; /* default mapping size */
int32_t headerFd; /* file handler */
......
......@@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) {
pthread_mutex_lock(&(pVnode->logMutex));
if (VALIDFD(pVnode->logFd)) {
if (FD_VALID(pVnode->logFd)) {
munmap(pVnode->pMem, pVnode->mappingSize);
close(pVnode->logFd);
rename(fileName, oldName);
......@@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) {
void vnodeCleanUpCommit(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode;
if (VALIDFD(pVnode->logFd)) close(pVnode->logFd);
if (FD_VALID(pVnode->logFd)) close(pVnode->logFd);
if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
......
......@@ -247,6 +247,47 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
pBlockLoadInfo->fileListIndex = -1;
}
/**
* if the header is smaller than a threshold value(header size + initial offset value)
*
* @param vnodeId
* @param headerFileSize
* @return
*/
static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg;
return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg);
}
static void doCloseQueryFileInfoFD(SQueryFileInfo* pVnodeFiles) {
tclose(pVnodeFiles->headerFd);
tclose(pVnodeFiles->dataFd);
tclose(pVnodeFiles->lastFd);
}
static int32_t doOpenQueryFileInfoDF(SQInfo* pQInfo, SQueryFileInfo* pVnodeFiles) {
// if the header is smaller than a threshold value, this file is empty, no need to
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
return -1;
}
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno));
return -1;
}
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
if (!FD_VALID(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno));
return -1;
}
return TSDB_CODE_SUCCESS;
}
static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
if (pRuntimeEnv->mmapedHFileIndex >= 0) {
assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0);
......@@ -255,6 +296,8 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize);
otherVnodeFiles->pHeaderFileData = NULL;
doCloseQueryFileInfoFD(otherVnodeFiles);
pRuntimeEnv->mmapedHFileIndex = -1;
}
......@@ -263,34 +306,46 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
/**
* mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to
* avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory",
* avoid too many memory mapped files at the save time to cause OS return the message of "Cannot allocate memory",
* during query processing.
*
* @param pRuntimeEnv
* @param fileIndex
* @return the return value may be null, so any invoker needs to check the returned value
*/
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) {
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex) {
assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles);
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex];
size_t size = pVnodeFiles->headFileSize;
if (pVnodeFiles->pHeaderFileData == NULL) {
assert(pRuntimeEnv->mmapedHFileIndex != fileIndex);
doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file
doUnmapHeaderFileData(pRuntimeEnv); // do close the other memory mapped header file
pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
assert(pVnodeFiles->pHeaderFileData == NULL);
// current header file is empty or broken, return directly
if (isHeaderFileEmpty(vnodeId, pVnodeFiles->headFileSize)) {
return pVnodeFiles->pHeaderFileData;
}
if (doOpenQueryFileInfoDF(pQInfo, pVnodeFiles) != TSDB_CODE_SUCCESS) {
return pVnodeFiles->pHeaderFileData;
}
pVnodeFiles->pHeaderFileData = mmap(NULL, pVnodeFiles->headFileSize, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
pVnodeFiles->pHeaderFileData = NULL;
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size,
strerror(errno));
doCloseQueryFileInfoFD(pVnodeFiles);
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath,
pVnodeFiles->headFileSize, strerror(errno));
} else {
pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file
if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) {
if (madvise(pVnodeFiles->pHeaderFileData, pVnodeFiles->headFileSize, MADV_SEQUENTIAL) == -1) {
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
}
}
......@@ -326,7 +381,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
pSummary->numOfSeek++;
#if 1
char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
char *data = vnodeGetHeaderFileData(pRuntimeEnv, pMeterObj->vnode, fileIndex);
if (data == NULL) {
return -1; // failed to load the header file data into memory
}
......@@ -2928,34 +2983,24 @@ static int file_order_comparator(const void *p1, const void *p2) {
* @param prefix
* @return
*/
static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId,
char *fileName, char *prefix) {
static int32_t vnodeOpenVnodeDBFiles(SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, char *fileName,
char *prefix) {
pVnodeFiles->fileID = fid;
pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE;
snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName);
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
if (!VALIDFD(pVnodeFiles->headerFd)) {
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
goto _clean;
}
snprintf(pVnodeFiles->headerFilePath, PATH_MAX, "%s%s", prefix, fileName);
struct stat fstat = {0};
if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1;
pVnodeFiles->headFileSize = fstat.st_size;
snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid);
snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid);
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
// pVnodeFiles->dataFileSize = fstat.st_size;
//
// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
// pVnodeFiles->lastFileSize = fstat.st_size;
snprintf(pVnodeFiles->dataFilePath, PATH_MAX, "%sv%df%d.data", prefix, vnodeId, fid);
snprintf(pVnodeFiles->lastFilePath, PATH_MAX, "%sv%df%d.last", prefix, vnodeId, fid);
pVnodeFiles->headerFd = FD_INITIALIZER;
pVnodeFiles->dataFd = FD_INITIALIZER;
pVnodeFiles->lastFd = FD_INITIALIZER;
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
/* enforce kernel to preload data when the file is mapping */
......@@ -2975,19 +3020,19 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
return TSDB_CODE_SUCCESS;
_clean:
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
_clean:
if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize);
pVnodeFiles->pDataFileData = NULL;
}
#endif
tclose(pVnodeFiles->headerFd);
tclose(pVnodeFiles->dataFd);
tclose(pVnodeFiles->lastFd);
return -1;
#endif
}
static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
......@@ -3047,7 +3092,7 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
}
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1];
int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix);
int32_t ret = vnodeOpenVnodeDBFiles(pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix);
if (ret < 0) {
memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information
pRuntimeEnv->numOfFiles -= 1;
......@@ -3767,7 +3812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
}
}
if (VALIDFD(pSupporter->meterOutputFd)) {
if (FD_VALID(pSupporter->meterOutputFd)) {
assert(pSupporter->meterOutputMMapBuf != NULL);
dTrace("QInfo:%p disk-based output buffer during query:%lld bytes", pQInfo, pSupporter->bufSize);
munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
......@@ -3868,7 +3913,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
if (!VALIDFD(pSupporter->meterOutputFd)) {
if (!FD_VALID(pSupporter->meterOutputFd)) {
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -5492,7 +5537,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
SVnodeObj *pVnode = &vnodeList[vid];
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
if (pHeaderFileData == NULL) { // failed to load header file into buffer
return 0;
}
......@@ -6491,7 +6536,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) {
return loadPrimaryTS;
}
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
......@@ -6979,7 +7024,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) {
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
// make sure file exist
if (VALIDFD(fd)) {
if (FD_VALID(fd)) {
size_t s = lseek(fd, 0, SEEK_END);
dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
......
......@@ -290,8 +290,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pSummary->numOfFiles++;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx);
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
fid += step;
continue;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册