提交 575c5e89 编写于 作者: M Minghao Li

refactor(sync): add config item in sync

上级 cc8e1071
...@@ -64,6 +64,11 @@ extern int32_t tsNumOfSnodeStreamThreads; ...@@ -64,6 +64,11 @@ extern int32_t tsNumOfSnodeStreamThreads;
extern int32_t tsNumOfSnodeWriteThreads; extern int32_t tsNumOfSnodeWriteThreads;
extern int64_t tsRpcQueueMemoryAllowed; extern int64_t tsRpcQueueMemoryAllowed;
// sync raft
extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
extern int32_t tsMonitorInterval; extern int32_t tsMonitorInterval;
...@@ -126,9 +131,9 @@ extern char tsUdfdResFuncs[]; ...@@ -126,9 +131,9 @@ extern char tsUdfdResFuncs[];
extern char tsUdfdLdLibPath[]; extern char tsUdfdLdLibPath[];
// schemaless // schemaless
extern char tsSmlChildTableName[]; extern char tsSmlChildTableName[];
extern char tsSmlTagName[]; extern char tsSmlTagName[];
extern bool tsSmlDataFormat; extern bool tsSmlDataFormat;
extern int32_t tsSmlBatchSize; extern int32_t tsSmlBatchSize;
// wal // wal
...@@ -146,7 +151,7 @@ extern int32_t tsUptimeInterval; ...@@ -146,7 +151,7 @@ extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval; extern int32_t tsRpcRetryInterval;
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
......
...@@ -43,7 +43,7 @@ extern "C" { ...@@ -43,7 +43,7 @@ extern "C" {
#define SYNC_MAX_RETRY_BACKOFF 5 #define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100 #define SYNC_LOG_REPL_RETRY_WAIT_MS 100
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8 #define SYNC_HEART_TIMEOUT_MS 1000 * 15
#define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
......
...@@ -55,6 +55,11 @@ int32_t tsNumOfQnodeFetchThreads = 1; ...@@ -55,6 +55,11 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsNumOfSnodeWriteThreads = 1;
// sync raft
int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000;
// monitor // monitor
bool tsEnableMonitor = true; bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30; int32_t tsMonitorInterval = 30;
...@@ -74,8 +79,8 @@ char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; ...@@ -74,8 +79,8 @@ char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash. // If set to empty system will generate table name using MD5 hash.
// true means that the name and order of cols in each line are the same(only for influx protocol) // true means that the name and order of cols in each line are the same(only for influx protocol)
bool tsSmlDataFormat = false; bool tsSmlDataFormat = false;
int32_t tsSmlBatchSize = 10000; int32_t tsSmlBatchSize = 10000;
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
...@@ -198,9 +203,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { ...@@ -198,9 +203,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t taosSetTfsCfg(SConfig *pCfg); int32_t taosSetTfsCfg(SConfig *pCfg);
#endif #endif
struct SConfig *taosGetCfg() { struct SConfig *taosGetCfg() { return tsCfg; }
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) { char *apolloUrl) {
...@@ -423,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -423,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0) if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
...@@ -728,6 +735,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -728,6 +735,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32;
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
...@@ -737,6 +748,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -737,6 +748,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32;
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncPipeline.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncPipeline.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tglobal.h"
#include "tref.h" #include "tref.h"
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
...@@ -1115,7 +1116,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1115,7 +1116,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->hbrSlowNum = 0; pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0; pSyncNode->tmrRoutineNum = 0;
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode); sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
tsHeartbeatInterval, tsHeartbeatTimeout);
return pSyncNode; return pSyncNode;
...@@ -1229,7 +1232,7 @@ void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } ...@@ -1229,7 +1232,7 @@ void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
sNTrace(pSyncNode, "sync close, data:%p", pSyncNode); sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
int32_t ret = raftStoreClose(pSyncNode->pRaftStore); int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0); ASSERT(ret == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册