提交 f8528407 编写于 作者: S slguan

Merge remote-tracking branch 'origin/develop' into feature/cluster

...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "tutil.h" #include "tutil.h"
#include "tlocale.h" #include "tlocale.h"
#include "ttimezone.h" #include "ttimezone.h"
#include "tsync.h"
char configDir[TSDB_FILENAME_LEN] = "/etc/taos"; char configDir[TSDB_FILENAME_LEN] = "/etc/taos";
char tsVnodeDir[TSDB_FILENAME_LEN] = {0}; char tsVnodeDir[TSDB_FILENAME_LEN] = {0};
...@@ -62,6 +63,7 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L}; ...@@ -62,6 +63,7 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L};
char tsMaster[TSDB_FQDN_LEN] = {0}; char tsMaster[TSDB_FQDN_LEN] = {0};
char tsSecond[TSDB_FQDN_LEN] = {0}; char tsSecond[TSDB_FQDN_LEN] = {0};
char tsArbitrator[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030; uint16_t tsServerPort = 6030;
uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030] uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030]
...@@ -141,6 +143,7 @@ int32_t qdebugFlag = 131; ...@@ -141,6 +143,7 @@ int32_t qdebugFlag = 131;
int32_t rpcDebugFlag = 131; int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131; int32_t uDebugFlag = 131;
int32_t debugFlag = 131; int32_t debugFlag = 131;
int32_t sDebugFlag = 131;
// the maximum number of results for projection query on super table that are returned from // the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp // one virtual node, to order according to timestamp
...@@ -341,6 +344,16 @@ static void doInitGlobalConfig() { ...@@ -341,6 +344,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "arbitrator";
cfg.ptr = tsArbitrator;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_FQDN_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// dnode configs // dnode configs
cfg.option = "numOfThreadsPerCore"; cfg.option = "numOfThreadsPerCore";
cfg.ptr = &tsNumOfThreadsPerCore; cfg.ptr = &tsNumOfThreadsPerCore;
...@@ -1012,6 +1025,16 @@ static void doInitGlobalConfig() { ...@@ -1012,6 +1025,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "sDebugFlag";
cfg.ptr = &sDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "sdbDebugFlag"; cfg.option = "sdbDebugFlag";
cfg.ptr = &sdbDebugFlag; cfg.ptr = &sdbDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -45,8 +45,6 @@ typedef struct { ...@@ -45,8 +45,6 @@ typedef struct {
typedef struct { typedef struct {
int8_t quorum; // number of confirms required, >=1 int8_t quorum; // number of confirms required, >=1
int8_t replica; // number of replications, >=1 int8_t replica; // number of replications, >=1
uint16_t arbitratorPort; // arbitrator port
char arbitratorFqdn[TSDB_FQDN_LEN]; // arbitrator IP address
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
} SSyncCfg; } SSyncCfg;
...@@ -108,6 +106,7 @@ extern int tsMaxWatchFiles; ...@@ -108,6 +106,7 @@ extern int tsMaxWatchFiles;
extern int tsSyncTimer; extern int tsSyncTimer;
extern int tsMaxFwdInfo; extern int tsMaxFwdInfo;
extern int sDebugFlag; extern int sDebugFlag;
extern char tsArbitrator[];
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -253,13 +253,7 @@ void sdbUpdateSync() { ...@@ -253,13 +253,7 @@ void sdbUpdateSync() {
} }
syncCfg.replica = index; syncCfg.replica = index;
syncCfg.arbitratorPort = syncCfg.nodeInfo[0].nodePort; syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
strcpy(syncCfg.arbitratorFqdn, syncCfg.nodeInfo[0].nodeFqdn);
if (syncCfg.replica == 1) {
syncCfg.quorum = 1;
} else {
syncCfg.quorum = 2;
}
bool hasThisDnode = false; bool hasThisDnode = false;
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
...@@ -272,9 +266,9 @@ void sdbUpdateSync() { ...@@ -272,9 +266,9 @@ void sdbUpdateSync() {
if (!hasThisDnode) return; if (!hasThisDnode) return;
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return; if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
sdbPrint("work as mnode, replica:%d arbitrator:%s", syncCfg.replica, syncCfg.arbitratorFqdn); sdbPrint("work as mnode, replica:%d", syncCfg.replica);
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
sdbPrint("mnode:%d, ip:%s", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn); sdbPrint("mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, syncCfg.nodeInfo[i].nodePort);
} }
SSyncInfo syncInfo; SSyncInfo syncInfo;
......
...@@ -431,8 +431,8 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { ...@@ -431,8 +431,8 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
void *oldSchema = pTable->schema; void *oldSchema = pTable->schema;
memcpy(pTable, pNew, pOper->rowSize); memcpy(pTable, pNew, pOper->rowSize);
pTable->schema = pNew->schema; pTable->schema = pNew->schema;
free(pNew);
free(pNew->vgList); free(pNew->vgList);
free(pNew);
free(oldSchema); free(oldSchema);
} }
......
...@@ -151,67 +151,6 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle ...@@ -151,67 +151,6 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
int taosOpenUDClientSocket(char *ip, uint16_t port) {
int sockFd = 0;
struct sockaddr_un serverAddr;
int ret;
char name[128];
sprintf(name, "%s.%hu", ip, port);
sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockFd < 0) {
uError("failed to open the UD socket:%s, reason:%s", name, strerror(errno));
return -1;
}
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sun_family = AF_UNIX;
strcpy(serverAddr.sun_path + 1, name);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) {
uError("failed to connect UD socket, name:%d, reason: %s", name, strerror(errno));
sockFd = -1;
}
return sockFd;
}
int taosOpenUDServerSocket(char *ip, uint16_t port) {
struct sockaddr_un serverAdd;
int sockFd;
char name[128];
uTrace("open ud socket:%s", name);
sprintf(name, "%s.%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sun_family = AF_UNIX;
strcpy(serverAdd.sun_path + 1, name);
unlink(name);
if ((sockFd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
uError("failed to open UD socket:%s, reason:%s", name, strerror(errno));
return -1;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
uError("bind socket:%s failed, reason:%s", name, strerror(errno));
tclose(sockFd);
return -1;
}
if (listen(sockFd, 10) < 0) {
uError("listen socket:%s failed, reason:%s", name, strerror(errno));
return -1;
}
return sockFd;
}
int taosInitTimer(void (*callback)(int), int ms) { int taosInitTimer(void (*callback)(int), int ms) {
signal(SIGALRM, callback); signal(SIGALRM, callback);
......
...@@ -143,68 +143,6 @@ int taosSetNonblocking(int sock, int on) { ...@@ -143,68 +143,6 @@ int taosSetNonblocking(int sock, int on) {
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) { int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
int taosOpenUDClientSocket(char *ip, uint16_t port) {
int sockFd = 0;
struct sockaddr_un serverAddr;
int ret;
char name[128];
sprintf(name, "%s.%hu", ip, port);
sockFd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockFd < 0) {
uError("failed to open the UD socket:%s, reason:%s", name, strerror(errno));
return -1;
}
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sun_family = AF_UNIX;
strcpy(serverAddr.sun_path + 1, name);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) {
uError("failed to connect UD socket, name:%d, reason: %s", name, strerror(errno));
sockFd = -1;
}
return sockFd;
}
int taosOpenUDServerSocket(char *ip, uint16_t port) {
struct sockaddr_un serverAdd;
int sockFd;
char name[128];
uTrace("open ud socket:%s", name);
sprintf(name, "%s.%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sun_family = AF_UNIX;
strcpy(serverAdd.sun_path + 1, name);
unlink(name);
if ((sockFd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
uError("failed to open UD socket:%s, reason:%s", name, strerror(errno));
return -1;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
uError("bind socket:%s failed, reason:%s", name, strerror(errno));
tclose(sockFd);
return -1;
}
if (listen(sockFd, 10) < 0) {
uError("listen socket:%s failed, reason:%s", name, strerror(errno));
return -1;
}
return sockFd;
}
static void taosDeleteTimer(void *tharg) { static void taosDeleteTimer(void *tharg) {
timer_t *pTimer = tharg; timer_t *pTimer = tharg;
timer_delete(*pTimer); timer_delete(*pTimer);
......
...@@ -33,10 +33,6 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port); ...@@ -33,10 +33,6 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int taosKeepTcpAlive(int sockFd); int taosKeepTcpAlive(int sockFd);
void taosCloseTcpSocket(int sockFd); void taosCloseTcpSocket(int sockFd);
int taosOpenUDServerSocket(uint32_t ip, uint16_t port);
int taosOpenUDClientSocket(uint32_t ip, uint16_t port);
int taosOpenRawSocket(uint32_t ip);
int taosGetFqdn(char *); int taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *); uint32_t taosGetIpFromFqdn(const char *);
void tinet_ntoa(char *ipstr, unsigned int ip); void tinet_ntoa(char *ipstr, unsigned int ip);
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "os.h" #include "os.h"
#include "tglobal.h"
#include "tulog.h" #include "tulog.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
...@@ -394,38 +393,6 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { ...@@ -394,38 +393,6 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return sockFd; return sockFd;
} }
int taosOpenRawSocket(uint32_t ip) {
int fd, hold;
struct sockaddr_in rawAdd;
uTrace("open udp raw socket:%s", ip);
fd = (int)socket(AF_INET, SOCK_RAW, IPPROTO_UDP);
if (fd < 0) {
uError("failed to open raw socket: %d (%s)", errno, strerror(errno));
return -1;
}
hold = 1;
if (taosSetSockOpt(fd, IPPROTO_IP, IP_HDRINCL, (void *)&hold, sizeof(hold)) < 0) {
uError("failed to set hold option: %d (%s)", errno, strerror(errno));
close(fd);
return -1;
}
bzero((char *)&rawAdd, sizeof(rawAdd));
rawAdd.sin_family = AF_INET;
rawAdd.sin_addr.s_addr = ip;
if (bind(fd, (struct sockaddr *)&rawAdd, sizeof(rawAdd)) < 0) {
uError("failed to bind RAW socket:(%s)", strerror(errno));
close(fd);
return -1;
}
return fd;
}
void tinet_ntoa(char *ipstr, unsigned int ip) { void tinet_ntoa(char *ipstr, unsigned int ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册