提交 da1351e3 编写于 作者: S Shengliang Guan

client config

上级 955b89f3
...@@ -23,7 +23,7 @@ typedef struct SBlockOrderInfo { ...@@ -23,7 +23,7 @@ typedef struct SBlockOrderInfo {
// bool hasNull; // bool hasNull;
} SBlockOrderInfo; } SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, uint16_t defaultPort, SEp *pEp); int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port); void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2); bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
......
...@@ -24,9 +24,14 @@ extern "C" { ...@@ -24,9 +24,14 @@ extern "C" {
#include "tdef.h" #include "tdef.h"
// cluster // cluster
extern int32_t tsVersion; extern char tsFirst[];
extern int32_t tsStatusInterval; extern char tsSecond[];
extern bool tsEnableTelemetryReporting; extern char tsLocalFqdn[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern int32_t tsVersion;
extern int32_t tsStatusInterval;
extern bool tsEnableTelemetryReporting;
// common // common
extern int32_t tsRpcTimer; extern int32_t tsRpcTimer;
......
...@@ -253,13 +253,6 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v ...@@ -253,13 +253,6 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq // --- mq
void hbMgrInitMqHbRspHandle(); void hbMgrInitMqHbRspHandle();
// config
int32_t tscInitLog(const char *cfgDir, const char *envFile, const char *apolloUrl);
int32_t tscInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl);
extern SConfig *tscCfg;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "clientInt.h"
#include "ulog.h"
int32_t tscCheckCfg(SConfig *pCfg) {
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
taosSetCoreDump(enableCore);
return 0;
}
SConfig *tscInitCfgImp(const char *cfgDir, const char *envFile, const char *apolloUrl) {
SConfig *pCfg = cfgInit();
if (pCfg == NULL) return NULL;
if (tscAddCfg(pCfg) != 0) {
uError("failed to init tsc cfg since %s", terrstr());
cfgCleanup(pCfg);
return NULL;
}
if (tscLoadCfg(pCfg, cfgDir, envFile, apolloUrl) != 0) {
printf("failed to load tsc cfg since %s\n", terrstr());
cfgCleanup(pCfg);
return NULL;
}
if (tscCheckCfg(pCfg) != 0) {
uError("failed to check cfg since %s", terrstr());
cfgCleanup(pCfg);
return NULL;
}
cfgDumpCfg(pCfg);
return pCfg;
}
int32_t tscInitCfg(const char *cfgDir, const char *envFile, const char *apolloUrl) {
tscCfg = tscInitCfgImp(cfgDir, envFile, apolloUrl);
if (tscCfg == NULL) return -1;
return 0;
}
\ No newline at end of file
...@@ -93,10 +93,10 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -93,10 +93,10 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.pfp = persistConnForSpecificMsg; rpcInit.pfp = persistConnForSpecificMsg;
rpcInit.sessions = cfgGetItem(tscCfg, "maxConnections")->i32; rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
rpcInit.idleTime = cfgGetItem(tscCfg, "shellActivityTimer")->i32 * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.secret = (char *)auth; rpcInit.secret = (char *)auth;
...@@ -212,12 +212,12 @@ void taos_init_imp(void) { ...@@ -212,12 +212,12 @@ void taos_init_imp(void) {
deltaToUtcInitOnce(); deltaToUtcInitOnce();
if (tscInitLog(configDir, NULL, NULL) != 0) { if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, 1) != 0) {
tscInitRes = -1; tscInitRes = -1;
return; return;
} }
if (tscInitCfg(configDir, NULL, NULL) != 0) { if (taosInitCfg(configDir, NULL, NULL, 1) != 0) {
tscInitRes = -1; tscInitRes = -1;
return; return;
} }
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
static int32_t initEpSetFromCfg(const char* ip, uint16_t port, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
...@@ -80,7 +80,19 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, ...@@ -80,7 +80,19 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
} }
SCorEpSet epSet = {0}; SCorEpSet epSet = {0};
initEpSetFromCfg(ip, port, &epSet); if (ip) {
if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
return NULL;
}
if (port) {
epSet.epSet.eps[0].port = port;
}
} else {
if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
return NULL;
}
}
char* key = getClusterKey(user, secretEncrypt, ip, port); char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo** pInst = NULL; SAppInstInfo** pInst = NULL;
...@@ -267,40 +279,32 @@ _return: ...@@ -267,40 +279,32 @@ _return:
return pRequest; return pRequest;
} }
int initEpSetFromCfg(const char* ip, uint16_t port, SCorEpSet* pEpSet) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
SConfigItem* pFirst = cfgGetItem(tscCfg, "firstEp"); pEpSet->version = 0;
SConfigItem* pSecond = cfgGetItem(tscCfg, "secondEp");
SConfigItem* pPort = cfgGetItem(tscCfg, "serverPort");
// init mnode ip set // init mnode ip set
SEpSet* mgmtEpSet = &(pEpSet->epSet); SEpSet* mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0; mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0; mgmtEpSet->inUse = 0;
pEpSet->version = 0;
if (ip != NULL) { if (firstEp && firstEp[0] != 0) {
taosGetFqdnPortFromEp(ip, (uint16_t)pPort->i32, &mgmtEpSet->eps[0]); if (strlen(firstEp) >= TSDB_EP_LEN) {
mgmtEpSet->numOfEps++; terrno = TSDB_CODE_TSC_INVALID_FQDN;
if (port) { return -1;
mgmtEpSet->eps[0].port = port;
}
} else {
if (pFirst->str[0] != 0) {
if (strlen(pFirst->str) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(pFirst->str, (uint16_t)pPort->i32, &mgmtEpSet->eps[0]);
mgmtEpSet->numOfEps++;
} }
if (pSecond->str[0] != 0) {
if (strlen(pSecond->str) >= TSDB_EP_LEN) { taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
terrno = TSDB_CODE_TSC_INVALID_FQDN; mgmtEpSet->numOfEps++;
return -1; }
}
taosGetFqdnPortFromEp(pSecond->str, (uint16_t)pPort->i32, &mgmtEpSet->eps[1]); if (secondEp && secondEp[0] != 0) {
mgmtEpSet->numOfEps++; if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
} }
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
} }
if (mgmtEpSet->numOfEps == 0) { if (mgmtEpSet->numOfEps == 0) {
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "tlockfree.h" #include "tlockfree.h"
int taosGetFqdnPortFromEp(const char *ep, uint16_t defaultPort, SEp* pEp) { int taosGetFqdnPortFromEp(const char *ep, SEp* pEp) {
pEp->port = 0; pEp->port = 0;
strcpy(pEp->fqdn, ep); strcpy(pEp->fqdn, ep);
...@@ -15,7 +15,7 @@ int taosGetFqdnPortFromEp(const char *ep, uint16_t defaultPort, SEp* pEp) { ...@@ -15,7 +15,7 @@ int taosGetFqdnPortFromEp(const char *ep, uint16_t defaultPort, SEp* pEp) {
} }
if (pEp->port == 0) { if (pEp->port == 0) {
pEp->port = defaultPort; pEp->port = tsServerPort;
return -1; return -1;
} }
......
...@@ -29,9 +29,14 @@ ...@@ -29,9 +29,14 @@
SConfig *tsCfg = NULL; SConfig *tsCfg = NULL;
// cluster // cluster
int32_t tsVersion = 30000000; char tsFirst[TSDB_EP_LEN] = {0};
int32_t tsStatusInterval = 1; // second char tsSecond[TSDB_EP_LEN] = {0};
bool tsEnableTelemetryReporting = 0; char tsLocalFqdn[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
int32_t tsVersion = 30000000;
int32_t tsStatusInterval = 1; // second
bool tsEnableTelemetryReporting = 0;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;
...@@ -321,11 +326,19 @@ static void taosSetServerLogCfg(SConfig *pCfg) { ...@@ -321,11 +326,19 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
static void taosSetClientCfg(SConfig *pCfg) { static void taosSetClientCfg(SConfig *pCfg) {
osSetTempDir(cfgGetItem(pCfg, "tempDir")->str); osSetTempDir(cfgGetItem(pCfg, "tempDir")->str);
osSetDataReservedSpace(cfgGetItem(pCfg, "minimalTempDirGB")->fval); osSetDataReservedSpace(cfgGetItem(pCfg, "minimalTempDirGB")->fval);
tstrncpy(tsFirst, cfgGetItem(pCfg, "firstEp")->str, TSDB_EP_LEN);
tstrncpy(tsSecond, cfgGetItem(pCfg, "secondEp")->str, TSDB_EP_LEN);
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_EP_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
taosGetSystemInfo(); taosGetSystemInfo();
if (tsNumOfCores <= 0) { if (tsNumOfCores <= 0) {
tsNumOfCores = 1; tsNumOfCores = 1;
} }
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
taosSetCoreDump(enableCore);
} }
static void taosSetServerCfg(SConfig *pCfg) { static void taosSetServerCfg(SConfig *pCfg) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmnInt.h"
int32_t dmnAddLogCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "logDir", osLogDir()) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalLogDirGB", 1.0f, 0.001f, 10000000) != 0) return -1;
if (cfgAddBool(pCfg, "asyncLog", 1) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfLogLines", 10000000, 1000, 2000000000) != 0) return -1;
if (cfgAddInt32(pCfg, "logKeepDays", 0, -365000, 365000) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "dDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "vDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "mDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "cDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "jniDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "tmrDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "uDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "qDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "wDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "sDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "tsdbDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", 0, 0, 255) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", 0, 0, 255) != 0) return -1;
return 0;
}
int32_t dmnSetLogCfg(SConfig *pCfg) {
osSetLogDir(cfgGetItem(pCfg, "logDir")->str);
osSetLogReservedSpace(cfgGetItem(pCfg, "minimalLogDirGB")->fval);
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
uDebugFlag = cfgGetItem(pCfg, "uDebugFlag")->i32;
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
wDebugFlag = cfgGetItem(pCfg, "wDebugFlag")->i32;
sDebugFlag = cfgGetItem(pCfg, "sDebugFlag")->i32;
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
int32_t debugFlag = cfgGetItem(pCfg, "debugFlag")->i32;
taosSetAllDebugFlag(debugFlag);
return 0;
}
int32_t dmnInitLog(const char *cfgDir, const char *envFile, const char *apolloUrl) {
SConfig *pCfg = cfgInit();
if (pCfg == NULL) return -1;
if (dmnAddLogCfg(pCfg) != 0) {
printf("failed to add log cfg since %s\n", terrstr());
cfgCleanup(pCfg);
return -1;
}
if (dmnLoadCfg(pCfg, cfgDir, envFile, apolloUrl) != 0) {
printf("failed to load log cfg since %s\n", terrstr());
cfgCleanup(pCfg);
return -1;
}
if (dmnSetLogCfg(pCfg) != 0) {
printf("failed to set log cfg since %s\n", terrstr());
cfgCleanup(pCfg);
return -1;
}
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file since %s\n", terrstr());
cfgCleanup(pCfg);
return -1;
}
cfgCleanup(pCfg);
return 0;
}
int32_t dmnLoadCfg(SConfig *pConfig, const char *inputCfgDir, const char *envFile, const char *apolloUrl) {
char configDir[PATH_MAX] = {0};
char configFile[PATH_MAX + 100] = {0};
taosExpandDir(inputCfgDir, configDir, PATH_MAX);
snprintf(configFile, sizeof(configFile), "%s" TD_DIRSEP "taos.cfg", configDir);
if (cfgLoad(pConfig, CFG_STYPE_APOLLO_URL, apolloUrl) != 0) {
uError("failed to load from apollo url:%s since %s\n", apolloUrl, terrstr());
return -1;
}
if (cfgLoad(pConfig, CFG_STYPE_CFG_FILE, configFile) != 0) {
if (cfgLoad(pConfig, CFG_STYPE_CFG_FILE, configDir) != 0) {
uError("failed to load from config file:%s since %s\n", configFile, terrstr());
return -1;
}
}
if (cfgLoad(pConfig, CFG_STYPE_ENV_FILE, envFile) != 0) {
uError("failed to load from env file:%s since %s\n", envFile, terrstr());
return -1;
}
if (cfgLoad(pConfig, CFG_STYPE_ENV_VAR, NULL) != 0) {
uError("failed to load from global env variables since %s\n", terrstr());
return -1;
}
return 0;
}
...@@ -296,7 +296,7 @@ PRASE_DNODE_OVER: ...@@ -296,7 +296,7 @@ PRASE_DNODE_OVER:
if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) {
SDnodeEp dnodeEp = {0}; SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1; dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, pDnode->cfg.serverPort, &dnodeEp.ep); taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep);
taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); taosArrayPush(pMgmt->pDnodeEps, &dnodeEp);
} }
......
...@@ -132,8 +132,7 @@ do { \ ...@@ -132,8 +132,7 @@ do { \
} while (0) } while (0)
int32_t getMaximumIdleDurationSec() { int32_t getMaximumIdleDurationSec() {
// todo return tsShellActivityTimer * 2;
return 6; //tsShellActivityTimer * 2;
} }
static int32_t getExprFunctionId(SExprInfo *pExprInfo) { static int32_t getExprFunctionId(SExprInfo *pExprInfo) {
...@@ -5302,12 +5301,10 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5302,12 +5301,10 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
rpcInit.label = "EX"; rpcInit.label = "EX";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = qProcessFetchRsp; rpcInit.cfp = qProcessFetchRsp;
// todo rpcInit.sessions = tsMaxConnections;
rpcInit.sessions = 50000; //tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root"; rpcInit.user = (char *)"root";
// todo rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.idleTime = 6; //tsShellActivityTimer * 1000;
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
......
...@@ -85,11 +85,10 @@ static void* pTaskQueue = NULL; ...@@ -85,11 +85,10 @@ static void* pTaskQueue = NULL;
int32_t initTaskQueue() { int32_t initTaskQueue() {
double factor = 4.0; double factor = 4.0;
// todo
// int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t numOfThreads = TMAX((int)(tsNumOfCores * 1.0f / factor), 2);
int32_t queueSize = 25000; //tsMaxConnections * 2; int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == pTaskQueue) { if (NULL == pTaskQueue) {
qError("failed to init task queue"); qError("failed to init task queue");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册