提交 70a07503 编写于 作者: H Hui Li

[fix bug]

上级 ef335b2c
......@@ -140,7 +140,7 @@ char tsMqttBrokerAddress[128] = {0};
char tsMqttBrokerClientId[128] = {0};
// monitor
int32_t tsEnableMonitorModule = 0;
int32_t tsEnableMonitorModule = 1;
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds
......
......@@ -32,8 +32,8 @@
typedef struct {
int port;
char *host[15];
} info;
char *host;
} info_s;
typedef struct Arguments {
char * host;
......@@ -65,10 +65,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, 0, 0};
void *checkPort(void *sarg) {
info *pinfo = (info *)sarg;
int port = pinfo->port;
char *host = *pinfo->host;
int checkTcpPort(info_s *info) {
int port = info->port;
char *host = info->host;
int clientSocket;
struct sockaddr_in serverAddr;
......@@ -77,38 +76,37 @@ void *checkPort(void *sarg) {
int iDataNum;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
return NULL;
return -1;
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(host);
printf("=================================\n");
//printf("=================================\n");
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
perror("connect");
return NULL;
return -1;
}
printf("Connect to: %s:%d...success\n", host, port);
//printf("Connect to: %s:%d...success\n", host, port);
sprintf(sendbuf, "send port_%d", port);
send(clientSocket, sendbuf, strlen(sendbuf), 0);
printf("Send msg_%d: %s\n", port, sendbuf);
//printf("Send msg_%d: %s\n", port, sendbuf);
recvbuf[0] = '\0';
iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0);
recvbuf[iDataNum] = '\0';
printf("Read ack msg_%d: %s\n", port, recvbuf);
//printf("Read ack msg_%d: %s\n", port, recvbuf);
printf("=================================\n");
//printf("=================================\n");
close(clientSocket);
return NULL;
return 0;
}
void *checkUPort(void *sarg) {
info *pinfo = (info *)sarg;
int port = pinfo->port;
char *host = *pinfo->host;
void *checkUdpPort(info_s *info) {
int port = info->port;
char *host = info->host;
int clientSocket;
struct sockaddr_in serverAddr;
......@@ -117,56 +115,62 @@ void *checkUPort(void *sarg) {
int iDataNum;
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket");
return NULL;
return -1;
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(host);
printf("=================================\n");
sprintf(sendbuf, "send msg port_%d by udp", port);
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size);
int code = sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size);
if (code < 0) {
perror("sendto");
return -1;
}
printf("Send msg_%d by udp: %s\n", port, sendbuf);
//printf("Send msg_%d by udp: %s\n", port, sendbuf);
recvbuf[0] = '\0';
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
recvbuf[iDataNum] = '\0';
printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
//printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
printf("=================================\n");
close(clientSocket);
return NULL;
return 0;
}
int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", 6041, 6050};
SArguments arguments = {"127.0.0.1", 6030, 6060};
info_s info;
int ret;
argp_parse(&argp, argc, argv, 0, 0, &arguments);
printf("host: %s\tport: %d\tmax_port: %d\n", arguments.host, arguments.port, arguments.max_port);
printf("host: %s\tport: %d\tmax_port: %d\n\n", arguments.host, arguments.port, arguments.max_port);
int port = arguments.port;
char *host = arguments.host;
info *tinfo = malloc(sizeof(info));
info *uinfo = malloc(sizeof(info));
for (; port < arguments.max_port; port++) {
printf("For test: %s:%d\n", host, port);
info.host = arguments.host;
*tinfo->host = host;
tinfo->port = port;
checkPort(tinfo);
for (; port < arguments.max_port; port++) {
printf("test: %s:%d\n", info.host, port);
info.port = port;
ret = checkTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.", port);
} else {
printf("tcp port:%d test ok.", port);
}
*uinfo->host = host;
uinfo->port = port;
checkUPort(uinfo);
checkUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.", port);
} else {
printf("udp port:%d test ok.", port);
}
}
free(tinfo);
free(uinfo);
}
......@@ -27,13 +27,13 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#define BUFFER_SIZE 200
typedef struct {
int port;
int type; // 0: tcp, 1: udo, default: 0
} info;
} info_s;
typedef struct Arguments {
char * host;
......@@ -43,7 +43,7 @@ typedef struct Arguments {
static struct argp_option options[] = {
{0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0},
{0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6020.", 1},
{0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1},
{0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -65,10 +65,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, 0, 0};
static void *bindPort(void *sarg) {
info *pinfo = (info *)sarg;
static void *bindTcpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
int type = pinfo->type;
int serverSocket;
struct sockaddr_in server_addr;
......@@ -98,14 +97,14 @@ static void *bindPort(void *sarg) {
return NULL;
}
printf("Bind port: %d success\n", port);
//printf("Bind port: %d success\n", port);
while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) {
perror("accept");
continue;
}
printf("=================================\n");
//printf("=================================\n");
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
while (1) {
......@@ -118,33 +117,30 @@ static void *bindPort(void *sarg) {
}
if (iDataNum > 0) {
buffer[iDataNum] = '\0';
printf("read msg:%s\n", buffer);
//printf("read msg:%s\n", buffer);
if (strcmp(buffer, "quit") == 0) break;
buffer[0] = '\0';
sprintf(buffer, "ack port_%d", port);
printf("send ack msg:%s\n", buffer);
//printf("send ack msg:%s\n", buffer);
send(client, buffer, strlen(buffer), 0);
break;
}
}
printf("=================================\n");
//printf("=================================\n");
}
close(serverSocket);
return NULL;
}
static void *bindUPort(void *sarg) {
info *pinfo = (info *)sarg;
static void *bindUdpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
int type = pinfo->type;
int serverSocket;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
int addr_len = sizeof(clientAddr);
int client;
char buffer[BUFFER_SIZE];
int iDataNum;
......@@ -164,7 +160,7 @@ static void *bindUPort(void *sarg) {
}
socklen_t sin_size;
printf("Bind port: %d success\n", port);
//printf("Bind port: %d success\n", port);
while (1) {
buffer[0] = '\0';
......@@ -178,21 +174,19 @@ static void *bindUPort(void *sarg) {
continue;
}
if (iDataNum > 0) {
printf("=================================\n");
//printf("=================================\n");
printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
buffer[iDataNum] = '\0';
printf("Read msg from udp:%s\n", buffer);
//printf("Read msg from udp:%s\n", buffer);
if (strcmp(buffer, "quit") == 0) break;
buffer[0] = '\0';
sprintf(buffer, "ack port_%d by udp", port);
printf("Send ack msg by udp:%s\n", buffer);
//printf("Send ack msg by udp:%s\n", buffer);
sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size);
send(client, buffer, strlen(buffer), 0);
printf("=================================\n");
//printf("=================================\n");
}
}
......@@ -202,39 +196,38 @@ static void *bindUPort(void *sarg) {
int main(int argc, char *argv[]) {
SArguments arguments = {"127.0.0.1", 6020, 6050};
SArguments arguments = {"127.0.0.1", 6030, 6060};
argp_parse(&argp, argc, argv, 0, 0, &arguments);
int port = arguments.port;
int num = arguments.max_port - arguments.port;
int num = arguments.max_port - arguments.port + 1;
if (num < 0) {
num = 1;
}
pthread_t *pids = malloc(2 * num * sizeof(pthread_t));
info * infos = malloc(num * sizeof(info));
info * uinfos = malloc(num * sizeof(info));
info_s * tinfos = malloc(num * sizeof(info_s));
info_s * uinfos = malloc(num * sizeof(info_s));
for (size_t i = 0; i < num; i++) {
info *pinfo = infos++;
pinfo->port = port;
info_s *tcpInfo = tinfos + i;
tcpInfo->port = port + i;
if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程
{ //创建线程失败
printf("创建线程失败: %d.\n", port);
exit(0);
if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
exit(-1);
}
info *uinfo = uinfos++;
uinfo->port = port;
uinfo->type = 1;
port++;
if (pthread_create(pids + num + i, NULL, bindUPort, uinfo) != 0) //创建线程
{ //创建线程失败
printf("创建线程失败: %d.\n", port);
exit(0);
info_s *udpInfo = uinfos + i;
udpInfo->port = port + i;
if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
exit(-1);
}
}
for (int i = 0; i < num; i++) {
pthread_join(pids[i], NULL);
pthread_join(pids[(num + i)], NULL);
......
......@@ -75,6 +75,11 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
}
void mnodeProcessPeerRsp(SRpcMsg *pMsg) {
if (!sdbIsMaster()) {
mError("%p, msg:%s is not processed for it is not master", pMsg->ahandle, taosMsg[pMsg->msgType]);
return;
}
if (tsMnodeProcessPeerRspFp[pMsg->msgType]) {
(*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg);
} else {
......
......@@ -164,9 +164,9 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
// reset vgid status on vgroup changed
mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId);
mDebug("vgId:%d, reset sync status to offline", pVgroup->vgId);
for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) {
pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED;
pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_OFFLINE;
}
mnodeDecVgroupRef(pVgroup);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册