提交 f06e25fe 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/mergeimport

...@@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else { \ } else { \
*(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \ *(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \
*(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \ *(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList + (TSDB_KEYSIZE)*index); \ *(int64_t *)(ctx)->ptsOutputBuf = (ctx)->ptsList[index]; \
} \ } \
} while (0); } while (0);
......
...@@ -1203,16 +1203,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -1203,16 +1203,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if (pCur->vnodeIndex == -1) { if (pCur->vnodeIndex == -1) {
if (pCur->order == TSQL_SO_ASC) { if (pCur->order == TSQL_SO_ASC) {
tsBufGetBlock(pTSBuf, 0, 0); tsBufGetBlock(pTSBuf, 0, 0);
// list is empty
if (pTSBuf->block.numOfElem == 0) { if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
return false; return false;
} else { } else {
return true; return true;
} }
} else {
} else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
pCur->vnodeIndex = vnodeIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1; int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
......
...@@ -155,7 +155,8 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, ...@@ -155,7 +155,8 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
int64_t iv; int64_t iv;
int32_t numType; int32_t numType;
char * endptr = NULL; char * endptr = NULL;
errno = 0; // clear the previous existed error information
switch (pSchema->type) { switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool case TSDB_DATA_TYPE_BOOL: { // bool
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
......
...@@ -64,8 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -64,8 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
#ifdef CLUSTER #ifdef CLUSTER
if (ip && ip[0]) { if (ip && ip[0]) {
strcpy(tscMgmtIpList.ipstr[0], ip); strcpy(tscMgmtIpList.ipstr[1], ip);
tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[1] = inet_addr(ip);
} }
#else #else
if (ip && ip[0]) { if (ip && ip[0]) {
......
...@@ -26,12 +26,14 @@ extern "C" { ...@@ -26,12 +26,14 @@ extern "C" {
#include "tsdb.h" #include "tsdb.h"
#ifndef STDERR_FILENO #ifndef STDERR_FILENO
#define VALIDFD(x) ((x) > 2) #define STDERR_FILENO (2)
#else
#define VALIDFD(x) ((x) > STDERR_FILENO)
#endif #endif
#define FD_VALID(x) ((x) > STDERR_FILENO)
#define FD_INITIALIZER ((int32_t)-1)
#define WCHAR wchar_t #define WCHAR wchar_t
#define tfree(x) \ #define tfree(x) \
{ \ { \
if (x) { \ if (x) { \
......
...@@ -68,6 +68,8 @@ ...@@ -68,6 +68,8 @@
#define HTTP_COMPRESS_IDENTITY 0 #define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2 #define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN * 2 + 1)
typedef enum { typedef enum {
HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_HANDLING,
...@@ -83,7 +85,7 @@ typedef struct { ...@@ -83,7 +85,7 @@ typedef struct {
int expire; int expire;
int access; int access;
void *taos; void *taos;
char id[TSDB_USER_LEN]; char id[HTTP_SESSION_ID_LEN + 1];
} HttpSession; } HttpSession;
typedef enum { typedef enum {
......
...@@ -50,6 +50,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -50,6 +50,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
return false; return false;
} }
strncpy(pContext->user, base64, (size_t)user_len); strncpy(pContext->user, base64, (size_t)user_len);
pContext->user[user_len] = 0;
char *password = user + 1; char *password = user + 1;
int pass_len = (int)((base64 + outlen) - password); int pass_len = (int)((base64 + outlen) - password);
...@@ -60,6 +61,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) { ...@@ -60,6 +61,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
return false; return false;
} }
strncpy(pContext->pass, password, (size_t)pass_len); strncpy(pContext->pass, password, (size_t)pass_len);
pContext->pass[pass_len] = 0;
free(base64); free(base64);
httpTrace("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr,
......
...@@ -41,8 +41,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -41,8 +41,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
pthread_mutex_lock(&server->serverMutex); pthread_mutex_lock(&server->serverMutex);
if (pContext->session != NULL && pContext->session == pContext->session->signature) { if (pContext->session != NULL && pContext->session == pContext->session->signature) {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%s:%p expired", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->id, pContext->session->taos); pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
pContext->session->expire = 0; pContext->session->expire = 0;
pContext->session->access--; pContext->session->access--;
} }
...@@ -51,7 +51,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -51,7 +51,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
session.taos = taos; session.taos = taos;
session.expire = (int)taosGetTimestampSec() + server->sessionExpire; session.expire = (int)taosGetTimestampSec() + server->sessionExpire;
session.access = 1; session.access = 1;
strcpy(session.id, pContext->user); snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = (HttpSession *)taosAddStrHash(server->pSessionHash, session.id, (char *)(&session)); pContext->session = (HttpSession *)taosAddStrHash(server->pSessionHash, session.id, (char *)(&session));
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
...@@ -62,20 +62,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) { ...@@ -62,20 +62,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
} }
pContext->session->signature = pContext->session; pContext->session->signature = pContext->session;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%s:%p", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr,
pContext->user, pContext->session, pContext->session->id, pContext->session->taos); pContext->user, pContext->session, pContext->session->taos);
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
void httpFetchSession(HttpContext *pContext) { void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer; HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex); pthread_mutex_lock(&server->serverMutex);
pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, pContext->user); char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, sessionId);
if (pContext->session != NULL && pContext->session == pContext->session->signature) { if (pContext->session != NULL && pContext->session == pContext->session->signature) {
pContext->session->access++; pContext->session->access++;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%s:%p, access:%d, expire:%d", httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d",
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->id, pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session,
pContext->session->taos, pContext->session->access, pContext->session->expire); pContext->session->taos, pContext->session->access, pContext->session->expire);
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire; pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
} else { } else {
...@@ -86,6 +89,20 @@ void httpFetchSession(HttpContext *pContext) { ...@@ -86,6 +89,20 @@ void httpFetchSession(HttpContext *pContext) {
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
void httpFetchSession(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSessionImp(pContext);
} else {
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
if (strcmp(pContext->session->id, sessionId) != 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user);
httpRestoreSession(pContext);
httpFetchSessionImp(pContext);
}
}
}
void httpRestoreSession(HttpContext *pContext) { void httpRestoreSession(HttpContext *pContext) {
HttpServer * server = pContext->pThread->pServer; HttpServer * server = pContext->pThread->pServer;
...@@ -97,15 +114,16 @@ void httpRestoreSession(HttpContext *pContext) { ...@@ -97,15 +114,16 @@ void httpRestoreSession(HttpContext *pContext) {
return; return;
} }
session->access--; session->access--;
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d", httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->id, session->taos, pContext, pContext->ipstr, pContext->user, session, session->taos,
session->access, pContext->session->expire); session->access, pContext->session->expire);
pContext->session = NULL;
pthread_mutex_unlock(&server->serverMutex); pthread_mutex_unlock(&server->serverMutex);
} }
void httpResetSession(char *session) { void httpResetSession(char *session) {
HttpSession *pSession = (HttpSession *)session; HttpSession *pSession = (HttpSession *)session;
httpTrace("close session:%p:%s:%p", pSession, pSession->id, pSession->taos); httpTrace("close session:%p:%p", pSession, pSession->taos);
if (pSession->taos != NULL) { if (pSession->taos != NULL) {
taos_close(pSession->taos); taos_close(pSession->taos);
pSession->taos = NULL; pSession->taos = NULL;
...@@ -144,12 +162,12 @@ int httpSessionExpired(char *session) { ...@@ -144,12 +162,12 @@ int httpSessionExpired(char *session) {
return 0; // un-expired, so return false return 0; // un-expired, so return false
} }
if (pSession->access > 0) { if (pSession->access > 0) {
httpTrace("session:%p:%s:%p is expired, but still access:%d", pSession, pSession->id, pSession->taos, httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
pSession->access); pSession->access);
return 0; // still used, so return false return 0; // still used, so return false
} }
httpTrace("need close session:%p:%s:%p for it expired, cur:%d, expire:%d, invertal:%d", httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->id, pSession->taos, cur, pSession->expire, cur - pSession->expire); pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
} }
return 1; return 1;
......
...@@ -378,9 +378,7 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { ...@@ -378,9 +378,7 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
} }
void httpProcessRequest(HttpContext *pContext) { void httpProcessRequest(HttpContext *pContext) {
if (pContext->session == NULL) { httpFetchSession(pContext);
httpFetchSession(pContext);
}
if (pContext->session == NULL || pContext->session != pContext->session->signature || if (pContext->session == NULL || pContext->session != pContext->session->signature ||
pContext->reqType == HTTP_REQTYPE_LOGIN) { pContext->reqType == HTTP_REQTYPE_LOGIN) {
......
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
#define taosCloseSocket(x) \ #define taosCloseSocket(x) \
{ \ { \
if (VALIDFD(x)) { \ if (FD_VALID(x)) { \
close(x); \ close(x); \
x = -1; \ x = -1; \
} \ } \
......
...@@ -75,11 +75,12 @@ extern "C" { ...@@ -75,11 +75,12 @@ extern "C" {
#define taosCloseSocket(x) \ #define taosCloseSocket(x) \
{ \ { \
if (VALIDFD(x)) { \ if (FD_VALID(x)) { \
close(x); \ close(x); \
x = -1; \ x = -1; \
} \ } \
} }
#define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len)
......
...@@ -111,7 +111,7 @@ typedef enum { ...@@ -111,7 +111,7 @@ typedef enum {
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFileInfo* pQueryFile, char* buf, uint64_t offset, typedef int32_t (*__read_data_fn_t)(int fd, SQInfo* pQInfo, SQueryFilesInfo* pQueryFile, char* buf, uint64_t offset,
int32_t size); int32_t size);
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
...@@ -191,10 +191,10 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo ...@@ -191,10 +191,10 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slo
int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus); int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus);
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData, uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData,
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo); int32_t numOfMeters, const char* filePath, SMeterDataInfo** pMeterDataInfo);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex); char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex);
/** /**
* Create SMeterQueryInfo. * Create SMeterQueryInfo.
......
...@@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo { ...@@ -47,29 +47,14 @@ typedef struct SQueryLoadCompBlockInfo {
int32_t fileId; int32_t fileId;
int32_t fileListIndex; int32_t fileListIndex;
} SQueryLoadCompBlockInfo; } SQueryLoadCompBlockInfo;
/* /*
* the header file info for one vnode * the header file info for one vnode
*/ */
typedef struct SQueryFileInfo { typedef struct SHeaderFileInfo {
int32_t fileID; /* file id */ int32_t fileID; // file id
char headerFilePath[256]; /* full file name */ size_t headFileSize; // header file size
char dataFilePath[256]; } SHeaderFileInfo;
char lastFilePath[256];
int32_t defaultMappingSize; /* default mapping size */
int32_t headerFd; /* file handler */
char* pHeaderFileData; /* mmap header files */
size_t headFileSize;
int32_t dataFd;
char* pDataFileData;
size_t dataFileSize;
uint64_t dtFileMappingOffset;
int32_t lastFd;
size_t lastFileSize;
uint64_t lastFileMappingOffset;
} SQueryFileInfo;
typedef struct SQueryCostSummary { typedef struct SQueryCostSummary {
double cacheTimeUs; double cacheTimeUs;
...@@ -106,6 +91,27 @@ typedef struct SOutputRes { ...@@ -106,6 +91,27 @@ typedef struct SOutputRes {
SResultInfo* resultInfo; SResultInfo* resultInfo;
} SOutputRes; } SOutputRes;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
typedef struct SQueryFilesInfo {
SHeaderFileInfo* pFileInfo;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct RuntimeEnvironment { typedef struct RuntimeEnvironment {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
...@@ -122,17 +128,10 @@ typedef struct RuntimeEnvironment { ...@@ -122,17 +128,10 @@ typedef struct RuntimeEnvironment {
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
/*
* header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function
*/
SQueryFileInfo* pVnodeFiles;
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap.
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo; SInterpolationInfo interpoInfo;
SData** pInterpoBuf; SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult SOutputRes* pResult; // reference to SQuerySupporter->pResult
......
...@@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) { ...@@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) {
pthread_mutex_lock(&(pVnode->logMutex)); pthread_mutex_lock(&(pVnode->logMutex));
if (VALIDFD(pVnode->logFd)) { if (FD_VALID(pVnode->logFd)) {
munmap(pVnode->pMem, pVnode->mappingSize); munmap(pVnode->pMem, pVnode->mappingSize);
close(pVnode->logFd); close(pVnode->logFd);
rename(fileName, oldName); rename(fileName, oldName);
...@@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) { ...@@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) {
void vnodeCleanUpCommit(int vnode) { void vnodeCleanUpCommit(int vnode) {
SVnodeObj *pVnode = vnodeList + vnode; SVnodeObj *pVnode = vnodeList + vnode;
if (VALIDFD(pVnode->logFd)) close(pVnode->logFd); if (FD_VALID(pVnode->logFd)) close(pVnode->logFd);
if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) { if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn); dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
......
...@@ -1830,7 +1830,15 @@ int vnodeInitFile(int vnode) { ...@@ -1830,7 +1830,15 @@ int vnodeInitFile(int vnode) {
pVnode->fmagic = (uint64_t *)calloc(pVnode->maxFiles + 1, sizeof(uint64_t)); pVnode->fmagic = (uint64_t *)calloc(pVnode->maxFiles + 1, sizeof(uint64_t));
int fileId = pVnode->fileId; int fileId = pVnode->fileId;
for (int i = 0; i < pVnode->numOfFiles; ++i) { /*
* The actual files will far exceed the files that need to exist
*/
if (pVnode->numOfFiles > pVnode->maxFiles) {
dError("vid:%d numOfFiles:%d should not larger than maxFiles:%d", vnode, pVnode->numOfFiles, pVnode->maxFiles);
}
int numOfFiles = MIN(pVnode->numOfFiles, pVnode->maxFiles);
for (int i = 0; i < numOfFiles; ++i) {
if (vnodeUpdateFileMagic(vnode, fileId) < 0) { if (vnodeUpdateFileMagic(vnode, fileId) < 0) {
if (pVnode->cfg.replications > 1) { if (pVnode->cfg.replications > 1) {
pVnode->badFileId = fileId; pVnode->badFileId = fileId;
......
...@@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -266,7 +266,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
int32_t vnodeId = pTempMeter->vnode; int32_t vnodeId = pTempMeter->vnode;
dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pRuntimeEnv->numOfFiles); SQueryFilesInfo* pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
int32_t fid = QUERY_IS_ASC_QUERY(pQuery) ? -1 : INT32_MAX; int32_t fid = QUERY_IS_ASC_QUERY(pQuery) ? -1 : INT32_MAX;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
...@@ -289,9 +291,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -289,9 +291,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pQuery->fileId = fid; pQuery->fileId = fid;
pSummary->numOfFiles++; pSummary->numOfFiles++;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx]; char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx);
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx); if (pHeaderFileData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next fid += step;
continue; continue;
} }
...@@ -307,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -307,20 +309,21 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
return NULL; return NULL;
} }
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters); dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pVnodeFileInfo->dataFilePath, numOfQualifiedMeters);
// none of meters in query set have pHeaderData in this file, try next file // none of meters in query set have pHeaderFileData in this file, try next file
if (numOfQualifiedMeters == 0) { if (numOfQualifiedMeters == 0) {
fid += step; fid += step;
tfree(pReqMeterDataInfo); tfree(pReqMeterDataInfo);
continue; continue;
} }
uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo, uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderFileData, numOfQualifiedMeters,
pReqMeterDataInfo); pVnodeFileInfo->headerFilePath, pReqMeterDataInfo);
dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pQueryFileInfo->dataFilePath, dTrace("QInfo:%p file:%s, %d meters contains %d blocks to be checked", pQInfo, pVnodeFileInfo->dataFilePath,
numOfQualifiedMeters, numOfBlocks); numOfQualifiedMeters, numOfBlocks);
if (numOfBlocks == 0) { if (numOfBlocks == 0) {
fid += step; fid += step;
tfree(pReqMeterDataInfo); tfree(pReqMeterDataInfo);
...@@ -345,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -345,7 +348,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
totalBlocks += numOfBlocks; totalBlocks += numOfBlocks;
// sequentially scan the pHeaderData file // sequentially scan the pHeaderFileData file
int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1;
for (; j < numOfBlocks && j >= 0; j += step) { for (; j < numOfBlocks && j >= 0; j += step) {
...@@ -427,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -427,7 +430,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
} }
int64_t time = taosGetTimestampUs() - st; int64_t time = taosGetTimestampUs() - st;
dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pRuntimeEnv->numOfFiles, dTrace("QInfo:%p complete check %d files, %d blocks, elapsed time:%.3fms", pQInfo, pVnodeFileInfo->numOfFiles,
totalBlocks, time / 1000.0); totalBlocks, time / 1000.0);
pSummary->fileTimeUs += time; pSummary->fileTimeUs += time;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册