diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 227095fc033837000a3ceade0e0f1cea728523e4..be3dce9c0e8efb0608d4c5a50f8a091987ef1326 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { } else { \ *(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \ *(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \ - *(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList + (TSDB_KEYSIZE)*index); \ + *(int64_t *)(ctx)->ptsOutputBuf = (ctx)->ptsList[index]; \ } \ } while (0); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 24d8ad902acb683f3b3ae05c10a5ff92181695df..17ea5cf8862f3fdb42f95cc80b84acf9394d26ae 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -1203,16 +1203,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) { if (pCur->vnodeIndex == -1) { if (pCur->order == TSQL_SO_ASC) { tsBufGetBlock(pTSBuf, 0, 0); - // list is empty - if (pTSBuf->block.numOfElem == 0) { + + if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return tsBufResetPos(pTSBuf); return false; } else { return true; } - } else { + + } else { // get the last timestamp record in the last block of the last vnode + assert(pTSBuf->numOfVnodes > 0); + int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; - + pCur->vnodeIndex = vnodeIndex; + int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 11bd33f17226bbf026f1296f4633ff3500b840f0..58cfcda17e27e42aae22c0f7a65658e6ce279476 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -155,7 +155,8 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, int64_t iv; int32_t numType; char * endptr = NULL; - + errno = 0; // clear the previous existed error information + switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { // bool if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index d423b69e4d814705b0789081171bf64f2e5fc9c6..233a37dc5930f8b5d16d608882ced99148493c4f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,8 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const #ifdef CLUSTER if (ip && ip[0]) { - strcpy(tscMgmtIpList.ipstr[0], ip); - tscMgmtIpList.ip[0] = inet_addr(ip); + strcpy(tscMgmtIpList.ipstr[1], ip); + tscMgmtIpList.ip[1] = inet_addr(ip); } #else if (ip && ip[0]) { diff --git a/src/inc/tutil.h b/src/inc/tutil.h index 683927c816d5284ac5b30af851e4e1cde28b5f25..bdf9df63eeafd73f892be7ab50dddb35ed9989c0 100644 --- a/src/inc/tutil.h +++ b/src/inc/tutil.h @@ -26,12 +26,14 @@ extern "C" { #include "tsdb.h" #ifndef STDERR_FILENO - #define VALIDFD(x) ((x) > 2) -#else - #define VALIDFD(x) ((x) > STDERR_FILENO) +#define STDERR_FILENO (2) #endif +#define FD_VALID(x) ((x) > STDERR_FILENO) +#define FD_INITIALIZER ((int32_t)-1) + #define WCHAR wchar_t + #define tfree(x) \ { \ if (x) { \ diff --git a/src/modules/http/inc/httpHandle.h b/src/modules/http/inc/httpHandle.h index f6ca8aee53c46fbf8e18897f9cc4bc40150f6041..1b746e15200e1dfa7f0b5dcfc0054120c94aee56 100644 --- a/src/modules/http/inc/httpHandle.h +++ b/src/modules/http/inc/httpHandle.h @@ -68,6 +68,8 @@ #define HTTP_COMPRESS_IDENTITY 0 #define HTTP_COMPRESS_GZIP 2 +#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN * 2 + 1) + typedef enum { HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING, @@ -83,7 +85,7 @@ typedef struct { int expire; int access; void *taos; - char id[TSDB_USER_LEN]; + char id[HTTP_SESSION_ID_LEN + 1]; } HttpSession; typedef enum { diff --git a/src/modules/http/src/httpAuth.c b/src/modules/http/src/httpAuth.c index 9d9ead73246837c78c3b534785a1ddc1cbc99055..4503accc0acdc74f1035b91bb2b85a344eb143fe 100644 --- a/src/modules/http/src/httpAuth.c +++ b/src/modules/http/src/httpAuth.c @@ -50,6 +50,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { return false; } strncpy(pContext->user, base64, (size_t)user_len); + pContext->user[user_len] = 0; char *password = user + 1; int pass_len = (int)((base64 + outlen) - password); @@ -60,6 +61,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { return false; } strncpy(pContext->pass, password, (size_t)pass_len); + pContext->pass[pass_len] = 0; free(base64); httpTrace("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr, diff --git a/src/modules/http/src/httpSession.c b/src/modules/http/src/httpSession.c index 8e8e39c8b07947445256b94178eba52e40724883..568936ede64c24250c16f590afd5a54378979ed3 100644 --- a/src/modules/http/src/httpSession.c +++ b/src/modules/http/src/httpSession.c @@ -41,8 +41,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) { pthread_mutex_lock(&server->serverMutex); if (pContext->session != NULL && pContext->session == pContext->session->signature) { - httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%s:%p expired", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->session, pContext->session->id, pContext->session->taos); + httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd, + pContext->ipstr, pContext->user, pContext->session, pContext->session->taos); pContext->session->expire = 0; pContext->session->access--; } @@ -51,7 +51,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) { session.taos = taos; session.expire = (int)taosGetTimestampSec() + server->sessionExpire; session.access = 1; - strcpy(session.id, pContext->user); + snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); pContext->session = (HttpSession *)taosAddStrHash(server->pSessionHash, session.id, (char *)(&session)); if (pContext->session == NULL) { httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, @@ -62,20 +62,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) { } pContext->session->signature = pContext->session; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%s:%p", pContext, pContext->fd, pContext->ipstr, - pContext->user, pContext->session, pContext->session->id, pContext->session->taos); + httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr, + pContext->user, pContext->session, pContext->session->taos); pthread_mutex_unlock(&server->serverMutex); } -void httpFetchSession(HttpContext *pContext) { +void httpFetchSessionImp(HttpContext *pContext) { HttpServer *server = pContext->pThread->pServer; pthread_mutex_lock(&server->serverMutex); - pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, pContext->user); + char sessionId[HTTP_SESSION_ID_LEN]; + snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + + pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, sessionId); if (pContext->session != NULL && pContext->session == pContext->session->signature) { pContext->session->access++; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%s:%p, access:%d, expire:%d", - pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->id, + httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d", + pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->access, pContext->session->expire); pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire; } else { @@ -86,6 +89,20 @@ void httpFetchSession(HttpContext *pContext) { pthread_mutex_unlock(&server->serverMutex); } +void httpFetchSession(HttpContext *pContext) { + if (pContext->session == NULL) { + httpFetchSessionImp(pContext); + } else { + char sessionId[HTTP_SESSION_ID_LEN]; + snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + if (strcmp(pContext->session->id, sessionId) != 0) { + httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user); + httpRestoreSession(pContext); + httpFetchSessionImp(pContext); + } + } +} + void httpRestoreSession(HttpContext *pContext) { HttpServer * server = pContext->pThread->pServer; @@ -97,15 +114,16 @@ void httpRestoreSession(HttpContext *pContext) { return; } session->access--; - httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d", - pContext, pContext->ipstr, pContext->user, session, session->id, session->taos, + httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d", + pContext, pContext->ipstr, pContext->user, session, session->taos, session->access, pContext->session->expire); + pContext->session = NULL; pthread_mutex_unlock(&server->serverMutex); } void httpResetSession(char *session) { HttpSession *pSession = (HttpSession *)session; - httpTrace("close session:%p:%s:%p", pSession, pSession->id, pSession->taos); + httpTrace("close session:%p:%p", pSession, pSession->taos); if (pSession->taos != NULL) { taos_close(pSession->taos); pSession->taos = NULL; @@ -144,12 +162,12 @@ int httpSessionExpired(char *session) { return 0; // un-expired, so return false } if (pSession->access > 0) { - httpTrace("session:%p:%s:%p is expired, but still access:%d", pSession, pSession->id, pSession->taos, + httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos, pSession->access); return 0; // still used, so return false } - httpTrace("need close session:%p:%s:%p for it expired, cur:%d, expire:%d, invertal:%d", - pSession, pSession->id, pSession->taos, cur, pSession->expire, cur - pSession->expire); + httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d", + pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire); } return 1; diff --git a/src/modules/http/src/httpSql.c b/src/modules/http/src/httpSql.c index 732d0179ff02fd695741ea3a13f8c53fb7e80627..4696e80dc785112a8345e6a74f42dd86e0295fa7 100644 --- a/src/modules/http/src/httpSql.c +++ b/src/modules/http/src/httpSql.c @@ -378,9 +378,7 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { } void httpProcessRequest(HttpContext *pContext) { - if (pContext->session == NULL) { - httpFetchSession(pContext); - } + httpFetchSession(pContext); if (pContext->session == NULL || pContext->session != pContext->session->signature || pContext->reqType == HTTP_REQTYPE_LOGIN) { diff --git a/src/os/darwin/inc/os.h b/src/os/darwin/inc/os.h index eabd5cd221ba2f72a2082273b7afc17825e40695..bf86103e8400725b991a9716de25b2018ec5a61d 100644 --- a/src/os/darwin/inc/os.h +++ b/src/os/darwin/inc/os.h @@ -47,7 +47,7 @@ #define taosCloseSocket(x) \ { \ - if (VALIDFD(x)) { \ + if (FD_VALID(x)) { \ close(x); \ x = -1; \ } \ diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index e9f1737cd4e9148ca948ee32c91ef9164cc26ba5..a3d50400c30874fd48cd3dc3d005682a23c58ea1 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -75,11 +75,12 @@ extern "C" { #define taosCloseSocket(x) \ { \ - if (VALIDFD(x)) { \ + if (FD_VALID(x)) { \ close(x); \ x = -1; \ } \ } + #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len) diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 57c4149f224b394838d66c683e528e392dc49ea4..35279ca011f1a8aeec012e1d2311862dde2f95ca 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -111,7 +111,7 @@ typedef enum { #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); -typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFileInfo* pQueryFile, char* buf, uint64_t offset, +typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFilesInfo* pQueryFile, char* buf, uint64_t offset, int32_t size); static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { @@ -191,10 +191,10 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData, - int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo); -int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, + int32_t numOfMeters, const char* filePath, SMeterDataInfo** pMeterDataInfo); +int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex); /** * Create SMeterQueryInfo. diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index b9bf684f799d08b540df3113ba2d67ecb41a7b40..f37030d97eb0ccd1a1c07fdb29e72e61356d3fa5 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo { int32_t fileId; int32_t fileListIndex; } SQueryLoadCompBlockInfo; + /* * the header file info for one vnode */ -typedef struct SQueryFileInfo { - int32_t fileID; /* file id */ - char headerFilePath[256]; /* full file name */ - char dataFilePath[256]; - char lastFilePath[256]; - int32_t defaultMappingSize; /* default mapping size */ - - int32_t headerFd; /* file handler */ - char* pHeaderFileData; /* mmap header files */ - size_t headFileSize; - int32_t dataFd; - char* pDataFileData; - size_t dataFileSize; - uint64_t dtFileMappingOffset; - - int32_t lastFd; - size_t lastFileSize; - uint64_t lastFileMappingOffset; - -} SQueryFileInfo; +typedef struct SHeaderFileInfo { + int32_t fileID; // file id + size_t headFileSize; // header file size +} SHeaderFileInfo; typedef struct SQueryCostSummary { double cacheTimeUs; @@ -106,6 +91,27 @@ typedef struct SOutputRes { SResultInfo* resultInfo; } SOutputRes; +/* + * header files info, avoid to iterate the directory, the data is acquired + * during in query preparation function + */ +typedef struct SQueryFilesInfo { + SHeaderFileInfo* pFileInfo; + uint32_t numOfFiles; // the total available number of files for this virtual node during query execution + int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. + int32_t vnodeId; + + int32_t headerFd; // header file fd + char* pHeaderFileData; // mmap header files + + int32_t dataFd; + int32_t lastFd; + char headerFilePath[PATH_MAX]; // current opened header file name + char dataFilePath[PATH_MAX]; // current opened data file name + char lastFilePath[PATH_MAX]; // current opened last file path + char dbFilePathPrefix[PATH_MAX]; +} SQueryFilesInfo; + typedef struct RuntimeEnvironment { SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ @@ -122,17 +128,10 @@ typedef struct RuntimeEnvironment { SQLFunctionCtx* pCtx; SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ - - /* - * header files info, avoid to iterate the directory, the data is acquired - * during in query preparation function - */ - SQueryFileInfo* pVnodeFiles; - uint32_t numOfFiles; // the total available number of files for this virtual node during query execution - int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap. + SQueryFilesInfo vnodeFileInfo; int16_t numOfRowsPerPage; int16_t offset[TSDB_MAX_COLUMNS]; - int16_t scanFlag; // denotes reversed scan of data or not + int16_t scanFlag; // denotes reversed scan of data or not SInterpolationInfo interpoInfo; SData** pInterpoBuf; SOutputRes* pResult; // reference to SQuerySupporter->pResult diff --git a/src/system/detail/src/vnodeCommit.c b/src/system/detail/src/vnodeCommit.c index 70c4cfe280c619431193dcd91bf33b1a0a917f6c..57bb52eb23d1a2d2604712709965c3b7bdd21f17 100644 --- a/src/system/detail/src/vnodeCommit.c +++ b/src/system/detail/src/vnodeCommit.c @@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) { pthread_mutex_lock(&(pVnode->logMutex)); - if (VALIDFD(pVnode->logFd)) { + if (FD_VALID(pVnode->logFd)) { munmap(pVnode->pMem, pVnode->mappingSize); close(pVnode->logFd); rename(fileName, oldName); @@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) { void vnodeCleanUpCommit(int vnode) { SVnodeObj *pVnode = vnodeList + vnode; - if (VALIDFD(pVnode->logFd)) close(pVnode->logFd); + if (FD_VALID(pVnode->logFd)) close(pVnode->logFd); if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) { dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index fc4b4351697a103f25eea6ed1b02de514fac3b76..f145f4a35a03768db4889c4432edca717b405fc2 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -1830,7 +1830,15 @@ int vnodeInitFile(int vnode) { pVnode->fmagic = (uint64_t *)calloc(pVnode->maxFiles + 1, sizeof(uint64_t)); int fileId = pVnode->fileId; - for (int i = 0; i < pVnode->numOfFiles; ++i) { + /* + * The actual files will far exceed the files that need to exist + */ + if (pVnode->numOfFiles > pVnode->maxFiles) { + dError("vid:%d numOfFiles:%d should not larger than maxFiles:%d", vnode, pVnode->numOfFiles, pVnode->maxFiles); + } + + int numOfFiles = MIN(pVnode->numOfFiles, pVnode->maxFiles); + for (int i = 0; i < numOfFiles; ++i) { if (vnodeUpdateFileMagic(vnode, fileId) < 0) { if (pVnode->cfg.replications > 1) { pVnode->badFileId = fileId; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index db0ec24395cbb15fd2bf619e6fe509a4a1b3652d..04c5dcb24ed2df51a5a5043365aca98a30f797ff 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -40,12 +40,12 @@ enum { #define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0) -static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFileInfo *pQueryFile, char *buf, uint64_t offset, - int32_t size); -static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQueryFile, char *buf, uint64_t offset, +//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, +// int32_t size); +static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); -__read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile}; +//__read_data_fn_t readDataFunctor[2] = {copyDataFromMMapBuffer, readDataFromDiskFile}; static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo); static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn, @@ -99,7 +99,7 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) { } static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *pMeterObj, SCompHeader *pCompHeader, - SQueryFileInfo *pQueryFileInfo, int32_t headerSize) { + SHeaderFileInfo *pQueryFileInfo, int32_t headerSize) { if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headFileSize) { dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%d is not valid, size:%ld", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headFileSize); @@ -195,7 +195,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj // if vnodeFreeFields is called, the pQuery->pFields is NULL if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && pQuery->pFields != NULL && pQuery->fileId > 0) { - assert(pRuntimeEnv->pVnodeFiles[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); + assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); return true; } @@ -207,7 +207,7 @@ static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t f pLoadCompBlockInfo->sid = sid; pLoadCompBlockInfo->fileListIndex = fileIndex; - pLoadCompBlockInfo->fileId = pRuntimeEnv->pVnodeFiles[fileIndex].fileID; + pLoadCompBlockInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID; } static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadInfo) { @@ -247,58 +247,148 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } -static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) { - if (pRuntimeEnv->mmapedHFileIndex >= 0) { - assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0); +/** + * if the header is smaller than a threshold value(header size + initial offset value) + * + * @param vnodeId + * @param headerFileSize + * @return + */ +static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) { + SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg; + return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); +} + +static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { + tclose(pVnodeFilesInfo->headerFd); + tclose(pVnodeFilesInfo->dataFd); + tclose(pVnodeFilesInfo->lastFd); +} + +static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { + pVnodeFilesInfo->current = -1; + + pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value + pVnodeFilesInfo->dataFd = FD_INITIALIZER; + pVnodeFilesInfo->lastFd = FD_INITIALIZER; +} + +static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFiles) { + // if the header is smaller than a threshold value, this file is empty, no need to + SHeaderFileInfo* pCurrentFileInfo = &pVnodeFiles->pFileInfo[pVnodeFiles->current]; + + // set the full file path for current opened files + snprintf(pVnodeFiles->headerFilePath, PATH_MAX, "%sv%df%d.head", pVnodeFiles->dbFilePathPrefix, + pVnodeFiles->vnodeId, pCurrentFileInfo->fileID); + + pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->headerFd)) { + dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); + return -1; + } + + snprintf(pVnodeFiles->dataFilePath, PATH_MAX, "%sv%df%d.data", pVnodeFiles->dbFilePathPrefix, + pVnodeFiles->vnodeId, pCurrentFileInfo->fileID); + + pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->dataFd)) { + dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno)); + return -1; + } + + snprintf(pVnodeFiles->lastFilePath, PATH_MAX, "%sv%df%d.last", pVnodeFiles->dbFilePathPrefix, + pVnodeFiles->vnodeId, pCurrentFileInfo->fileID); + + pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); + if (!FD_VALID(pVnodeFiles->lastFd)) { + dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno)); + return -1; + } + + pVnodeFiles->pHeaderFileData = mmap(NULL, pCurrentFileInfo->headFileSize, PROT_READ, MAP_SHARED, + pVnodeFiles->headerFd, 0); + + if (pVnodeFiles->pHeaderFileData == MAP_FAILED) { + pVnodeFiles->pHeaderFileData = NULL; + + doCloseQueryFileInfoFD(pVnodeFiles); + doInitQueryFileInfoFD(pVnodeFiles); + + dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, + pCurrentFileInfo->headFileSize, strerror(errno)); + + return -1; + } else { + if (madvise(pVnodeFiles->pHeaderFileData, pCurrentFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) { + dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); + } + } + + return TSDB_CODE_SUCCESS; +} + +static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) { + if (pVnodeFileInfo->current >= 0) { + + assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0 && + pVnodeFileInfo->pHeaderFileData != NULL); + + SHeaderFileInfo *pCurrentFile = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; - SQueryFileInfo *otherVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->mmapedHFileIndex]; - munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize); + munmap(pVnodeFileInfo->pHeaderFileData, pCurrentFile->headFileSize); + pVnodeFileInfo->pHeaderFileData = NULL; - otherVnodeFiles->pHeaderFileData = NULL; - pRuntimeEnv->mmapedHFileIndex = -1; + doCloseQueryFileInfoFD(pVnodeFileInfo); + doInitQueryFileInfoFD(pVnodeFileInfo); } - assert(pRuntimeEnv->mmapedHFileIndex == -1); + assert(pVnodeFileInfo->current == -1); } /** * mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to - * avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory", + * avoid too many memory mapped files at the save time to cause OS return the message of "Cannot allocate memory", * during query processing. * * @param pRuntimeEnv * @param fileIndex * @return the return value may be null, so any invoker needs to check the returned value */ -char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) { - assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles); +char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex) { + assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->vnodeFileInfo.numOfFiles); SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output - SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex]; - size_t size = pVnodeFiles->headFileSize; - - if (pVnodeFiles->pHeaderFileData == NULL) { - assert(pRuntimeEnv->mmapedHFileIndex != fileIndex); - doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file + SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; + SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[fileIndex]; - pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0); - if (pVnodeFiles->pHeaderFileData == MAP_FAILED) { - pVnodeFiles->pHeaderFileData = NULL; - dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size, - strerror(errno)); - } else { - pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file - if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) { - dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); - } + if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) { + if (pVnodeFileInfo->current >= 0) { + assert(pVnodeFileInfo->pHeaderFileData != NULL); + } + + doCloseOpenedFileData(pVnodeFileInfo); // do close the other memory mapped header file + assert(pVnodeFileInfo->pHeaderFileData == NULL); + + // current header file is empty or broken, return directly + if (isHeaderFileEmpty(vnodeId, pHeaderFileInfo->headFileSize)) { + qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, + pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, fileIndex, pHeaderFileInfo->headFileSize); + + return pVnodeFileInfo->pHeaderFileData; + } + + // set current opened file Index + pVnodeFileInfo->current = fileIndex; + + if (doOpenQueryFileData(pQInfo, pVnodeFileInfo) != TSDB_CODE_SUCCESS) { + doCloseOpenedFileData(pVnodeFileInfo); // there may be partially open fd, close it anyway. + return pVnodeFileInfo->pHeaderFileData; } - } else { - assert(pRuntimeEnv->mmapedHFileIndex == fileIndex); } - return pVnodeFiles->pHeaderFileData; + return pVnodeFileInfo->pHeaderFileData; } /* @@ -310,7 +400,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex]; + SHeaderFileInfo *pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; int64_t st = taosGetTimestampUs(); @@ -326,7 +416,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim pSummary->numOfSeek++; #if 1 - char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); + char *data = vnodeGetHeaderFileData(pRuntimeEnv, pMeterObj->vnode, fileIndex); if (data == NULL) { return -1; // failed to load the header file data into memory } @@ -337,7 +427,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim #endif // check the offset value integrity - if (validateHeaderOffsetSegment(pQInfo, pQueryFileInfo->headerFilePath, pMeterObj->vnode, data, + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, data, getCompHeaderSegSize(pCfg)) < 0) { return -1; } @@ -365,7 +455,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim #endif // check compblock info integrity - if (validateCompBlockInfoSegment(pQInfo, pQueryFileInfo->headerFilePath, pMeterObj->vnode, compInfo, + if (validateCompBlockInfoSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, compInfo, compHeader->compInfoOffset) < 0) { return -1; } @@ -399,7 +489,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim #endif // check comp block integrity - if (validateCompBlockSegment(pQInfo, pQueryFileInfo->headerFilePath, compInfo, (char *)pQuery->pBlock, + if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, compInfo, (char *)pQuery->pBlock, pMeterObj->vnode, checksum) < 0) { return -1; } @@ -409,7 +499,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim int64_t et = taosGetTimestampUs(); qTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load compblock info, size:%d, elapsed:%f ms", pQInfo, - pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->pVnodeFiles[fileIndex].fileID, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID, compBlockSize, (et - st) / 1000.0); pSummary->totalCompInfoSize += compBlockSize; @@ -469,8 +559,9 @@ static int32_t binarySearchForBlock(SQuery *pQuery, int64_t key) { return binarySearchForBlockImpl(pQuery->pBlock, pQuery->numOfBlocks, key, pQuery->order.order); } +#if 0 /* unmap previous buffer */ -static UNUSED_FUNC int32_t resetMMapWindow(SQueryFileInfo *pQueryFileInfo) { +static UNUSED_FUNC int32_t resetMMapWindow(SHeaderFileInfo *pQueryFileInfo) { munmap(pQueryFileInfo->pDataFileData, pQueryFileInfo->defaultMappingSize); pQueryFileInfo->dtFileMappingOffset = 0; @@ -484,7 +575,7 @@ static UNUSED_FUNC int32_t resetMMapWindow(SQueryFileInfo *pQueryFileInfo) { return 0; } -static int32_t moveMMapWindow(SQueryFileInfo *pQueryFileInfo, uint64_t offset) { +static int32_t moveMMapWindow(SHeaderFileInfo *pQueryFileInfo, uint64_t offset) { uint64_t upperBnd = (pQueryFileInfo->dtFileMappingOffset + pQueryFileInfo->defaultMappingSize - 1); /* data that are located in current mmapping window */ @@ -531,7 +622,7 @@ static int32_t moveMMapWindow(SQueryFileInfo *pQueryFileInfo, uint64_t offset) { return 0; } -static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFileInfo *pQueryFile, char *buf, uint64_t offset, +static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SHeaderFileInfo *pQueryFile, char *buf, uint64_t offset, int32_t size) { assert(size >= 0); @@ -582,7 +673,9 @@ static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFileInfo *pQ return 0; } -static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQueryFile, char *buf, uint64_t offset, +#endif + +static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size) { assert(size >= 0); @@ -597,7 +690,7 @@ static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFileInfo *pQue return 0; } -static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo, SCompBlock *pBlock, SField *pFields, +static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo, SCompBlock *pBlock, SField *pFields, int32_t col, SData *sdata, void *tmpBuf, char *buffer, int32_t buffersize) { char *dst = (pBlock->algorithm) ? tmpBuf : sdata->data; @@ -605,14 +698,14 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo, SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); int fd = pBlock->last ? pQueryFileInfo->lastFd : pQueryFileInfo->dataFd; - int32_t ret = (*readDataFunctor[DEFAULT_IO_ENGINE])(fd, pQInfo, pQueryFileInfo, dst, offset, pFields[col].len); + int32_t ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, dst, offset, pFields[col].len); if (ret != 0) { return ret; } // load checksum TSCKSUM checksum = 0; - ret = (*readDataFunctor[DEFAULT_IO_ENGINE])(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, + ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, sizeof(TSCKSUM)); if (ret != 0) { return ret; @@ -634,12 +727,12 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFileInfo *pQueryFileInfo, return 0; } -static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFileInfo *pQueryFileInfo, - SCompBlock *pBlock, SField **pField) { +static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) { SQuery * pQuery = pRuntimeEnv->pQuery; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - + SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo; + size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM); // if *pField != NULL, this block is loaded once, in current query do nothing @@ -654,9 +747,8 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile int64_t st = taosGetTimestampUs(); - int fd = pBlock->last ? pQueryFileInfo->lastFd : pQueryFileInfo->dataFd; - int32_t ret = - (*readDataFunctor[DEFAULT_IO_ENGINE])(fd, pQInfo, pQueryFileInfo, (char *)(*pField), pBlock->offset, size); + int fd = pBlock->last ? pVnodeFilesInfo->lastFd : pVnodeFilesInfo->dataFd; + int32_t ret = readDataFromDiskFile(fd, pQInfo, pVnodeFilesInfo, (char *)(*pField), pBlock->offset, size); if (ret != 0) { return ret; } @@ -664,7 +756,7 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SQueryFile // check fields integrity if (!taosCheckChecksumWhole((uint8_t *)(*pField), size)) { dLError("QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%lld", pQInfo, - pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pQueryFileInfo->dataFilePath, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pVnodeFilesInfo->dataFilePath, pBlock->offset); return -1; } @@ -692,7 +784,8 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; SData ** sdata = pRuntimeEnv->colDataBuffer; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; + assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); + SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer; void * tmpBuf = pRuntimeEnv->unzipBuffer; @@ -705,7 +798,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR } /* failed to load fields info, return with error info */ - if (loadSField && (loadDataBlockFieldsInfo(pRuntimeEnv, pQueryFileInfo, pBlock, pField) != 0)) { + if (loadSField && (loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, pField) != 0)) { return -1; } @@ -720,7 +813,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR } else { columnBytes += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); int32_t ret = - loadColumnIntoMem(pQuery, pQueryFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf, + loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf, tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); if (ret != 0) { return -1; @@ -758,7 +851,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR fillWithNull(pQuery, sdata[i]->data, i, pBlock->numOfPoints); } else { columnBytes += (*pField)[j].len + sizeof(TSCKSUM); - ret = loadColumnIntoMem(pQuery, pQueryFileInfo, pBlock, *pField, j, sdata[i], tmpBuf, + ret = loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, j, sdata[i], tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); pSummary->numOfSeek++; @@ -1395,7 +1488,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { TSKEY key = *(TSKEY *)(pCtx[0].aInputElemBuf + TSDB_KEYSIZE * offset); -#if 1 +#if defined(_DEBUG_VIEW) printf("elem in comp ts file:%lld, key:%lld, tag:%d, id:%s, query order:%d, ts order:%d, traverse:%d, index:%d\n", elem.ts, key, elem.tag, pRuntimeEnv->pMeterObj->meterId, pQuery->order.order, pRuntimeEnv->pTSBuf->tsOrder, pRuntimeEnv->pTSBuf->cur.order, pRuntimeEnv->pTSBuf->cur.tsIndex); @@ -1708,23 +1801,25 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * } int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { - if (pRuntimeEnv->numOfFiles == 0) { + if (pRuntimeEnv->vnodeFileInfo.numOfFiles == 0) { return -1; } + SQueryFilesInfo* pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; + /* set the initial file for current query */ - if (order == TSQL_SO_ASC && *fid < pRuntimeEnv->pVnodeFiles[0].fileID) { - *fid = pRuntimeEnv->pVnodeFiles[0].fileID; + if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) { + *fid = pVnodeFiles->pFileInfo[0].fileID; return 0; - } else if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID) { - *fid = pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1].fileID; - return pRuntimeEnv->numOfFiles - 1; + } else if (order == TSQL_SO_DESC && *fid > pVnodeFiles->pFileInfo[pVnodeFiles->numOfFiles - 1].fileID) { + *fid = pVnodeFiles->pFileInfo[pVnodeFiles->numOfFiles - 1].fileID; + return pVnodeFiles->numOfFiles - 1; } - int32_t numOfFiles = pRuntimeEnv->numOfFiles; + int32_t numOfFiles = pVnodeFiles->numOfFiles; - if (order == TSQL_SO_DESC && *fid > pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID) { - *fid = pRuntimeEnv->pVnodeFiles[numOfFiles - 1].fileID; + if (order == TSQL_SO_DESC && *fid > pVnodeFiles->pFileInfo[numOfFiles - 1].fileID) { + *fid = pVnodeFiles->pFileInfo[numOfFiles - 1].fileID; return numOfFiles - 1; } @@ -1732,12 +1827,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, int32_t i = 0; int32_t step = 1; - while (i pRuntimeEnv->pVnodeFiles[i].fileID) { + while (i pVnodeFiles->pFileInfo[i].fileID) { i += step; } - if (i < numOfFiles && *fid <= pRuntimeEnv->pVnodeFiles[i].fileID) { - *fid = pRuntimeEnv->pVnodeFiles[i].fileID; + if (i < numOfFiles && *fid <= pVnodeFiles->pFileInfo[i].fileID) { + *fid = pVnodeFiles->pFileInfo[i].fileID; return i; } else { return -1; @@ -1746,12 +1841,12 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, int32_t i = numOfFiles - 1; int32_t step = -1; - while (i >= 0 && *fid < pRuntimeEnv->pVnodeFiles[i].fileID) { + while (i >= 0 && *fid < pVnodeFiles->pFileInfo[i].fileID) { i += step; } - if (i >= 0 && *fid >= pRuntimeEnv->pVnodeFiles[i].fileID) { - *fid = pRuntimeEnv->pVnodeFiles[i].fileID; + if (i >= 0 && *fid >= pVnodeFiles->pFileInfo[i].fileID) { + *fid = pVnodeFiles->pFileInfo[i].fileID; return i; } else { return -1; @@ -2082,24 +2177,11 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->primaryColBuffer); } - for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { - SQueryFileInfo *pQFileInfo = &(pRuntimeEnv->pVnodeFiles[i]); - if (pQFileInfo->pHeaderFileData != NULL && pQFileInfo->pHeaderFileData != MAP_FAILED) { - munmap(pQFileInfo->pHeaderFileData, pQFileInfo->headFileSize); - } - tclose(pQFileInfo->headerFd); - - if (pQFileInfo->pDataFileData != NULL && pQFileInfo->pDataFileData != MAP_FAILED) { - munmap(pQFileInfo->pDataFileData, pQFileInfo->defaultMappingSize); - } - - tclose(pQFileInfo->dataFd); - tclose(pQFileInfo->lastFd); - } + doCloseOpenedFileData(&pRuntimeEnv->vnodeFileInfo); - if (pRuntimeEnv->pVnodeFiles != NULL) { - pRuntimeEnv->numOfFiles = 0; - free(pRuntimeEnv->pVnodeFiles); + if (pRuntimeEnv->vnodeFileInfo.pFileInfo != NULL) { + pRuntimeEnv->vnodeFileInfo.numOfFiles = 0; + free(pRuntimeEnv->vnodeFileInfo.pFileInfo); } if (pRuntimeEnv->pInterpoBuf != NULL) { @@ -2907,8 +2989,8 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) { } static int file_order_comparator(const void *p1, const void *p2) { - SQueryFileInfo *pInfo1 = (SQueryFileInfo *)p1; - SQueryFileInfo *pInfo2 = (SQueryFileInfo *)p2; + SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1; + SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2; if (pInfo1->fileID == pInfo2->fileID) { return 0; @@ -2919,6 +3001,7 @@ static int file_order_comparator(const void *p1, const void *p2) { /** * open a data files and header file for metric meta query + * * @param pQInfo * @param pVnodeFiles * @param fid @@ -2927,36 +3010,17 @@ static int file_order_comparator(const void *p1, const void *p2) { * @param prefix * @return */ -static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, - char *fileName, char *prefix) { - // __off_t size = 0; - - pVnodeFiles->fileID = fid; - pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE; - - snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName); - pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY); +static int32_t vnodeOpenVnodeDBFiles(SQueryFilesInfo *pVnodeFiles, int32_t fid, int32_t index, char *fileName) { + SHeaderFileInfo *pFileInfo = &pVnodeFiles->pFileInfo[index]; + + pFileInfo->fileID = fid; - if (!VALIDFD(pVnodeFiles->headerFd)) { - dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno)); - goto _clean; - } + char buf[PATH_MAX] = {0}; + snprintf(buf, PATH_MAX, "%s%s", pVnodeFiles->dbFilePathPrefix, fileName); struct stat fstat = {0}; - if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1; - pVnodeFiles->headFileSize = fstat.st_size; - - snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid); - snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid); - - pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY); - pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY); - -// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1; -// pVnodeFiles->dataFileSize = fstat.st_size; -// -// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1; -// pVnodeFiles->lastFileSize = fstat.st_size; + if (stat(buf, &fstat) < 0) return -1; + pFileInfo->headFileSize = fstat.st_size; #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP /* enforce kernel to preload data when the file is mapping */ @@ -2976,38 +3040,38 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles return TSDB_CODE_SUCCESS; -_clean: - #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP +_clean: if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) { munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize); pVnodeFiles->pDataFileData = NULL; } -#endif - + tclose(pVnodeFiles->headerFd); tclose(pVnodeFiles->dataFd); tclose(pVnodeFiles->lastFd); return -1; + +#endif } -static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { - char dbFilePathPrefix[TSDB_FILENAME_LEN] = {0}; - - sprintf(dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); - DIR *pDir = opendir(dbFilePathPrefix); - if (pDir == NULL) { - dError("QInfo:%p failed to open directory:%s", pQInfo, dbFilePathPrefix); - return; - } - +static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { char suffix[] = ".head"; struct dirent *pEntry = NULL; size_t alloc = 4; // default allocated size - SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->pMeterQuerySupporter->runtimeEnv); - pRuntimeEnv->pVnodeFiles = calloc(1, sizeof(SQueryFileInfo) * alloc); + SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo); + pVnodeFilesInfo->vnodeId = vnodeId; + + sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); + DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); + if (pDir == NULL) { + dError("QInfo:%p failed to open directory:%s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix); + return; + } + + pVnodeFilesInfo->pFileInfo = calloc(1, sizeof(SHeaderFileInfo) * alloc); SVnodeObj *pVnode = &vnodeList[vnodeId]; while ((pEntry = readdir(pDir)) != NULL) { @@ -3041,26 +3105,28 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) { assert(fid >= 0 && vid >= 0); - if (++pRuntimeEnv->numOfFiles > alloc) { - alloc = alloc << 1; - pRuntimeEnv->pVnodeFiles = realloc(pRuntimeEnv->pVnodeFiles, alloc * sizeof(SQueryFileInfo)); - memset(&pRuntimeEnv->pVnodeFiles[alloc >> 1], 0, (alloc >> 1) * sizeof(SQueryFileInfo)); + if (++pVnodeFilesInfo->numOfFiles > alloc) { + alloc = alloc << 1U; + pVnodeFilesInfo->pFileInfo = realloc(pVnodeFilesInfo->pFileInfo, alloc * sizeof(SHeaderFileInfo)); + memset(&pVnodeFilesInfo->pFileInfo[alloc >> 1U], 0, (alloc >> 1U) * sizeof(SHeaderFileInfo)); } - SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1]; - int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix); + SHeaderFileInfo *pVnodeFiles = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->numOfFiles - 1]; + + int32_t ret = vnodeOpenVnodeDBFiles(pVnodeFilesInfo, fid, pVnodeFilesInfo->numOfFiles - 1, pEntry->d_name); if (ret < 0) { - memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information - pRuntimeEnv->numOfFiles -= 1; + memset(pVnodeFiles, 0, sizeof(SHeaderFileInfo)); // reset information + pVnodeFilesInfo->numOfFiles -= 1; } } closedir(pDir); - dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pRuntimeEnv->numOfFiles, dbFilePathPrefix); + dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, + pVnodeFilesInfo->dbFilePathPrefix); /* order the files information according their names */ - qsort(pRuntimeEnv->pVnodeFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator); + qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), file_order_comparator); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) { @@ -3642,7 +3708,8 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete // dataInCache requires lastKey value pQuery->lastKey = pQuery->skey; - pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value + doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); + vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3675,7 +3742,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete return ret; } - vnodeOpenAllFiles(pQInfo, pMeterObj->vnode); + vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { @@ -3768,7 +3835,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } } - if (VALIDFD(pSupporter->meterOutputFd)) { + if (FD_VALID(pSupporter->meterOutputFd)) { assert(pSupporter->meterOutputMMapBuf != NULL); dTrace("QInfo:%p disk-based output buffer during query:%lld bytes", pQInfo, pSupporter->bufSize); munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); @@ -3811,8 +3878,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->pointsRead = 0; changeExecuteScanOrder(pQuery, true); - - pSupporter->runtimeEnv.mmapedHFileIndex = -1; // set the initial value + + doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3853,7 +3920,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } tSidSetSort(pSupporter->pSidSet); - vnodeOpenAllFiles(pQInfo, pMeter->vnode); + vnodeRecordAllFiles(pQInfo, pMeter->vnode); if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) { return ret; @@ -3869,7 +3936,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile); pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666); - if (!VALIDFD(pSupporter->meterOutputFd)) { + if (!FD_VALID(pSupporter->meterOutputFd)) { dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -3994,7 +4061,7 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { // the fields info is not loaded, load it into memory if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { - loadDataBlockFieldsInfo(pRuntimeEnv, &pRuntimeEnv->pVnodeFiles[fileIndex], pBlock, &pQuery->pFields[pQuery->slot]); + loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); } SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); @@ -5489,11 +5556,12 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - SQueryFileInfo * pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIndex]; + + SHeaderFileInfo * pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; SVnodeObj *pVnode = &vnodeList[vid]; - char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); + char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); if (pHeaderFileData == NULL) { // failed to load header file into buffer return 0; } @@ -5501,7 +5569,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); // file is corrupted, abort query in current file - if (validateHeaderOffsetSegment(pQInfo, pQueryFileInfo->headerFilePath, vid, pHeaderFileData, tmsize) < 0) { + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < 0) { *numOfMeters = 0; return 0; } @@ -5791,7 +5859,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, * @return */ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData, - int32_t numOfMeters, SQueryFileInfo *pQueryFileInfo, SMeterDataInfo **pMeterDataInfo) { + int32_t numOfMeters, const char* filePath, SMeterDataInfo **pMeterDataInfo) { uint32_t numOfBlocks = 0; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; @@ -5803,7 +5871,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer SMeterObj *pMeterObj = pMeterDataInfo[j]->pMeterObj; SCompInfo *compInfo = (SCompInfo *)(pHeaderData + pMeterDataInfo[j]->offsetInHeaderFile); - int32_t ret = validateCompBlockInfoSegment(pQInfo, pQueryFileInfo->headerFilePath, pMeterObj->vnode, compInfo, + int32_t ret = validateCompBlockInfoSegment(pQInfo, filePath, pMeterObj->vnode, compInfo, pMeterDataInfo[j]->offsetInHeaderFile); if (ret != 0) { clearMeterDataBlockInfo(pMeterDataInfo[j]); @@ -5822,8 +5890,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuer // check compblock integrity TSCKSUM checksum = *(TSCKSUM *)((char *)compInfo + sizeof(SCompInfo) + size); - ret = validateCompBlockSegment(pQInfo, pQueryFileInfo->headerFilePath, compInfo, (char *)pCompBlock, - pMeterObj->vnode, checksum); + ret = validateCompBlockSegment(pQInfo, filePath, compInfo, (char *)pCompBlock, pMeterObj->vnode, checksum); if (ret < 0) { clearMeterDataBlockInfo(pMeterDataInfo[j]); continue; @@ -6492,11 +6559,11 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) { return loadPrimaryTS; } -int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, +int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; + SHeaderFileInfo *pQueryFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIdx]; TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; @@ -6531,7 +6598,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkS setTimestampRange(pRuntimeEnv, pBlock->keyFirst, pBlock->keyLast); } else if (req == BLK_DATA_FILEDS_NEEDED) { - if (loadDataBlockFieldsInfo(pRuntimeEnv, pQueryFileInfo, pBlock, pFields) < 0) { + if (loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, pFields) < 0) { return DISK_DATA_LOAD_FAILED; } } else { @@ -6540,7 +6607,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkS } } else { _load_all: - if (loadDataBlockFieldsInfo(pRuntimeEnv, pQueryFileInfo, pBlock, pFields) < 0) { + if (loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, pFields) < 0) { return DISK_DATA_LOAD_FAILED; } @@ -6980,7 +7047,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) { int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666); // make sure file exist - if (VALIDFD(fd)) { + if (FD_VALID(fd)) { size_t s = lseek(fd, 0, SEEK_END); dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 4703f7f8dbb7706bd0a4660a179476d1831c2167..e73c96d04c73cdac622dcd83889a7b09f086865a 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; int32_t vnodeId = pTempMeter->vnode; - dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pRuntimeEnv->numOfFiles); + SQueryFilesInfo* pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; + + dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles); int32_t fid = QUERY_IS_ASC_QUERY(pQuery) ? -1 : INT32_MAX; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -289,9 +291,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe pQuery->fileId = fid; pSummary->numOfFiles++; - SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; - char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx); - if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next + char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx); + if (pHeaderFileData == NULL) { // failed to mmap header file into buffer, ignore current file, try next + fid += step; continue; } @@ -307,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe return NULL; } - dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters); + dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters); - // none of meters in query set have pHeaderData in this file, try next file + // none of meters in query set have pHeaderFileData in this file, try next file if (numOfQualifiedMeters == 0) { fid += step; tfree(pReqMeterDataInfo); continue; } - uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo, - pReqMeterDataInfo); + uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderFileData, numOfQualifiedMeters, + pVnodeFileInfo->headerFilePath, pReqMeterDataInfo); - dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pQueryFileInfo->dataFilePath, + dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters, numOfBlocks); + if (numOfBlocks == 0) { fid += step; tfree(pReqMeterDataInfo); @@ -345,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe totalBlocks += numOfBlocks; - // sequentially scan the pHeaderData file + // sequentially scan the pHeaderFileData file int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; for (; j < numOfBlocks && j >= 0; j += step) { @@ -427,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe } int64_t time = taosGetTimestampUs() - st; - dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pRuntimeEnv->numOfFiles, + dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pVnodeFileInfo->numOfFiles, totalBlocks, time / 1000.0); pSummary->fileTimeUs += time;