提交 9f185225 编写于 作者: S slguan

Merge branch 'develop' into feature/slguan

...@@ -59,6 +59,22 @@ void tscPrintMgmtIp() { ...@@ -59,6 +59,22 @@ void tscPrintMgmtIp() {
} }
#endif #endif
/*
* For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
* the poor network quality. And then, the second retry get the response with redirection command.
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
*/
static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2;
#ifdef CLUSTER
return tscMgmtIpList.numOfIps * factor;
#else
return 1*factor;
#endif
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -134,18 +150,17 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -134,18 +150,17 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb); tscProcessSql(pObj->pHb);
} }
//TODO HANDLE error from mgmt
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) { void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj; STscObj *pTscObj = pSql->pTscObj;
#ifdef CLUSTER #ifdef CLUSTER
if (pSql->retry < tscMgmtIpList.numOfIps) { if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
*pCode = 0; *pCode = 0;
pSql->retry++; pSql->retry++;
pSql->index = pSql->index % tscMgmtIpList.numOfIps; pSql->index = pSql->index % tscMgmtIpList.numOfIps;
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1; if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user); void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
#else #else
if (pSql->retry < 1) { if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
*pCode = 0; *pCode = 0;
pSql->retry++; pSql->retry++;
void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, TSC_MGMT_VNODE, pTscObj->user); void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, TSC_MGMT_VNODE, pTscObj->user);
...@@ -444,16 +459,13 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -444,16 +459,13 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} }
} else { } else {
uint16_t rspCode = pMsg->content[0]; uint16_t rspCode = pMsg->content[0];
#ifdef CLUSTER
#ifdef CLUSTER
if (rspCode == TSDB_CODE_REDIRECT) { if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql); tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL; pSql->thandle = NULL;
// reset the retry times for a new mgmt node
pSql->retry = 0;
if (pCmd->command > TSDB_SQL_MGMT) { if (pCmd->command > TSDB_SQL_MGMT) {
tscProcessMgmtRedirect(pSql, pMsg->content + 1); tscProcessMgmtRedirect(pSql, pMsg->content + 1);
......
...@@ -174,7 +174,7 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32 ...@@ -174,7 +174,7 @@ void queryOnBlock(SMeterQuerySupportObj* pSupporter, int64_t* primaryKeys, int32
SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields, SBlockInfo* pBlockBasicInfo, SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
__block_search_fn_t searchFn); __block_search_fn_t searchFn);
SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** vnodeFilterQualifiedMeters(SQInfo* pQInfo, int32_t vid, int32_t fileIndex,
tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo, int32_t* numOfMeters); tSidSet* pSidSet, SMeterDataInfo* pMeterDataInfo, int32_t* numOfMeters);
int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t vnodeGetVnodeHeaderFileIdx(int32_t* fid, SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
...@@ -194,6 +194,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuer ...@@ -194,6 +194,7 @@ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuer
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo); int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv, int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand); int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
/** /**
* Create SMeterQueryInfo. * Create SMeterQueryInfo.
......
...@@ -57,9 +57,9 @@ typedef struct SQueryFileInfo { ...@@ -57,9 +57,9 @@ typedef struct SQueryFileInfo {
char lastFilePath[256]; char lastFilePath[256];
int32_t defaultMappingSize; /* default mapping size */ int32_t defaultMappingSize; /* default mapping size */
int32_t headerFd; /* file handler */ int32_t headerFd; /* file handler */
char* pHeaderFileData; /* mmap header files */ char* pHeaderFileData; /* mmap header files */
size_t headFileSize; size_t headFileSize;
int32_t dataFd; int32_t dataFd;
char* pDataFileData; char* pDataFileData;
size_t dataFileSize; size_t dataFileSize;
...@@ -107,44 +107,40 @@ typedef struct SOutputRes { ...@@ -107,44 +107,40 @@ typedef struct SOutputRes {
} SOutputRes; } SOutputRes;
typedef struct RuntimeEnvironment { typedef struct RuntimeEnvironment {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
SPositionInfo nextPos; /* start position of the next scan */ SPositionInfo nextPos; /* start position of the next scan */
SData* colDataBuffer[TSDB_MAX_COLUMNS]; SData* colDataBuffer[TSDB_MAX_COLUMNS];
SResultInfo* resultInfo; SResultInfo* resultInfo;
uint8_t blockStatus; // Indicate if data block is loaded, the block is first/last/internal block
// Indicate if data block is loaded, the block is first/last/internal block int32_t unzipBufSize;
int8_t blockStatus; SData* primaryColBuffer;
int32_t unzipBufSize; char* unzipBuffer;
SData* primaryColBuffer; char* secondaryUnzipBuffer;
char* unzipBuffer; SQuery* pQuery;
char* secondaryUnzipBuffer; SMeterObj* pMeterObj;
SQuery* pQuery; SQLFunctionCtx* pCtx;
SMeterObj* pMeterObj; SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQLFunctionCtx* pCtx;
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
/* /*
* header files info, avoid to iterate the directory, the data is acquired * header files info, avoid to iterate the directory, the data is acquired
* during in query preparation function * during in query preparation function
*/ */
SQueryFileInfo* pHeaderFiles; SQueryFileInfo* pVnodeFiles;
uint32_t numOfFiles; /* number of files of one vnode during query execution */ uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t mmapedHFileIndex; // the mmaped header file, NOTE: only one header file can be mmap.
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
int16_t scanFlag; /* denotes reversed scan of data or not */
SInterpolationInfo interpoInfo; SInterpolationInfo interpoInfo;
SData** pInterpoBuf; SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList; void* hashList;
int32_t usedIndex; // assigned SOutputRes in list int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSBuf* pTSBuf; STSCursor cur;
STSCursor cur; SQueryCostSummary summary;
SQueryCostSummary summary;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
/* intermediate result during multimeter query involves interval */ /* intermediate result during multimeter query involves interval */
......
...@@ -289,13 +289,15 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -289,13 +289,15 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
pQuery->fileId = fid; pQuery->fileId = fid;
pSummary->numOfFiles++; pSummary->numOfFiles++;
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pHeaderFiles[fileIdx]; SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
char * pHeaderData = pQueryFileInfo->pHeaderFileData; char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
continue;
}
int32_t numOfQualifiedMeters = 0; int32_t numOfQualifiedMeters = 0;
SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters( SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet,
pQInfo, vnodeId, pQueryFileInfo, pSupporter->pSidSet, pMeterDataInfo, &numOfQualifiedMeters); pMeterDataInfo, &numOfQualifiedMeters);
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
if (pReqMeterDataInfo == NULL) { if (pReqMeterDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo); dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo);
...@@ -305,6 +307,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -305,6 +307,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
return NULL; return NULL;
} }
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
// none of meters in query set have pHeaderData in this file, try next file // none of meters in query set have pHeaderData in this file, try next file
if (numOfQualifiedMeters == 0) { if (numOfQualifiedMeters == 0) {
fid += step; fid += step;
...@@ -500,7 +504,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start ...@@ -500,7 +504,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]); resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]);
} }
#endif #endif
...@@ -670,7 +674,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -670,7 +674,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP #if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) { for (int32_t i = 0; i < pRuntimeEnv->numOfFiles; ++i) {
resetMMapWindow(&pRuntimeEnv->pHeaderFiles[i]); resetMMapWindow(&pRuntimeEnv->pVnodeFiles[i]);
} }
#endif #endif
......
...@@ -510,7 +510,7 @@ static void doInitGlobalConfig() { ...@@ -510,7 +510,7 @@ static void doInitGlobalConfig() {
0, TSDB_MAX_VNODES, 0, TSDB_CFG_UTYPE_NONE); 0, TSDB_MAX_VNODES, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "tables", &tsSessionsPerVnode, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "tables", &tsSessionsPerVnode, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
4, 220000, 0, TSDB_CFG_UTYPE_NONE); TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "cache", &tsCacheBlockSize, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "cache", &tsCacheBlockSize, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
100, 1048576, 0, TSDB_CFG_UTYPE_BYTE); 100, 1048576, 0, TSDB_CFG_UTYPE_BYTE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册