提交 599760c6 编写于 作者: H Hui Li

[TD-1256]

上级 03721879
...@@ -50,6 +50,9 @@ typedef struct SShellArguments { ...@@ -50,6 +50,9 @@ typedef struct SShellArguments {
char* commands; char* commands;
int abort; int abort;
int port; int port;
int endPort;
int pktLen;
char* netTestRole;
} SShellArguments; } SShellArguments;
/**************** Function declarations ****************/ /**************** Function declarations ****************/
......
...@@ -46,6 +46,9 @@ static struct argp_option options[] = { ...@@ -46,6 +46,9 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when testing, default is NULL, valid option: client | server."},
{"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}}; {0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state) {
...@@ -65,6 +68,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -65,6 +68,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'P': case 'P':
if (arg) { if (arg) {
tsDnodeShellPort = atoi(arg); tsDnodeShellPort = atoi(arg);
arguments->port = atoi(arg);
} else { } else {
fprintf(stderr, "Invalid port\n"); fprintf(stderr, "Invalid port\n");
return -1; return -1;
...@@ -126,6 +130,29 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -126,6 +130,29 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'd': case 'd':
arguments->database = arg; arguments->database = arg;
break; break;
case 'n':
arguments->netTestRole = arg;
break;
case 'e':
if (arg) {
arguments->endPort = atoi(arg);
} else {
fprintf(stderr, "Invalid end port\n");
return -1;
}
break;
case 'l':
if (arg) {
arguments->pktLen = atoi(arg);
} else {
fprintf(stderr, "Invalid packet length\n");
return -1;
}
break;
case OPT_ABORT: case OPT_ABORT:
arguments->abort = 1; arguments->abort = 1;
break; break;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "os.h" #include "os.h"
#include "shell.h" #include "shell.h"
#include "tnettest.h"
pthread_t pid; pthread_t pid;
...@@ -60,7 +61,10 @@ SShellArguments args = { ...@@ -60,7 +61,10 @@ SShellArguments args = {
.file = "\0", .file = "\0",
.dir = "\0", .dir = "\0",
.threadNum = 5, .threadNum = 5,
.commands = NULL .commands = NULL,
.endPort = 6042,
.pktLen = 1000,
.netTestRole = NULL
}; };
/* /*
...@@ -75,6 +79,11 @@ int main(int argc, char* argv[]) { ...@@ -75,6 +79,11 @@ int main(int argc, char* argv[]) {
shellParseArgument(argc, argv, &args); shellParseArgument(argc, argv, &args);
if (args.netTestRole && args.netTestRole[0] != 0) {
taosNetTest(args.host, (uint16_t)args.port, (uint16_t)args.endPort, args.pktLen, args.netTestRole);
exit(0);
}
/* Initialize the shell */ /* Initialize the shell */
TAOS* con = shellInit(&args); TAOS* con = shellInit(&args);
if (con == NULL) { if (con == NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TNETTEST_H
#define TDENGINE_TNETTEST_H
#ifdef __cplusplus
extern "C" {
#endif
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TNETTEST_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <argp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <wordexp.h>
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tulog.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tsocket.h"
#define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
typedef struct {
uint32_t hostIp;
uint16_t port;
uint16_t pktLen;
} info_s;
static char serverFqdn[TSDB_FQDN_LEN];
static uint16_t g_startPort = 0;
static uint16_t g_endPort = 6042;
static void *bindUdpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
int serverSocket;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
char buffer[BUFFER_SIZE];
int iDataNum;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket");
return NULL;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("connect");
return NULL;
}
socklen_t sin_size;
while (1) {
memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&server_addr);
iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
if (iDataNum < 0) {
perror("recvfrom null");
continue;
}
if (iDataNum > 0) {
printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum);
//printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16);
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size);
}
}
close(serverSocket);
return NULL;
}
static void *bindTcpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
int serverSocket;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
int addr_len = sizeof(clientAddr);
int client;
char buffer[BUFFER_SIZE];
int iDataNum = 0;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
printf("socket() fail: %s", strerror(errno));
return NULL;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
printf("port:%d bind() fail: %s", port, strerror(errno));
return NULL;
}
if (listen(serverSocket, 5) < 0) {
printf("listen() fail: %s", strerror(errno));
return NULL;
}
//printf("Bind port: %d success\n", port);
while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) {
printf("accept() fail: %s", strerror(errno));
continue;
}
iDataNum = 0;
memset(buffer, 0, BUFFER_SIZE);
int nleft, nread;
char *ptr = buffer;
nleft = pinfo->pktLen;
while (nleft > 0) {
nread = recv(client, ptr, BUFFER_SIZE, 0);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", inet_ntoa(clientAddr.sin_addr), port, strerror(errno));
close(serverSocket);
return NULL;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
}
printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum);
if (iDataNum > 0) {
send(client, buffer, iDataNum, 0);
}
}
close(serverSocket);
return NULL;
}
static int checkTcpPort(info_s *info) {
int clientSocket;
struct sockaddr_in serverAddr;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int iDataNum = 0;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("socket() fail: %s\n", strerror(errno));
return -1;
}
// set send and recv overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
//printf("=================================\n");
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
printf("connect() fail: %s\t", strerror(errno));
return -1;
}
//printf("Connect to: %s:%d...success\n", host, port);
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
send(clientSocket, sendbuf, info->pktLen, 0);
memset(recvbuf, 0, BUFFER_SIZE);
int nleft, nread;
char *ptr = recvbuf;
nleft = info->pktLen;
while (nleft > 0) {
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);;
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR) {
continue;
} else {
printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno));
close(clientSocket);
return -1;
}
} else {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
}
if (iDataNum < info->pktLen) {
printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port);
return -1;
}
//printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
close(clientSocket);
return 0;
}
static int checkUdpPort(info_s *info) {
int clientSocket;
struct sockaddr_in serverAddr;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int iDataNum = 0;
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket");
return -1;
}
// set overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port);
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size);
if (code < 0) {
perror("sendto");
return -1;
}
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < info->pktLen) {
printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port);
return -1;
}
//printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
close(clientSocket);
return 0;
}
static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) {
int ret;
info_s info;
memset(&info, 0, sizeof(info_s));
info.hostIp = hostIp;
info.pktLen = pktLen;
for (uint16_t port = startPort; port <= maxPort; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
info.port = port;
ret = checkTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.\t\n", port);
} else {
printf("tcp port:%d test ok.\t\t", port);
}
ret = checkUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.\t\n", port);
} else {
printf("udp port:%d test ok.\t\t", port);
}
}
printf("\n");
return ;
}
static void taosNetTestClient(const char* serverFqdn, uint16_t startPort, uint16_t endPort, int pktLen) {
uint32_t serverIp = taosGetIpFromFqdn(serverFqdn);
if (serverIp == 0xFFFFFFFF) {
printf("Failed to resolve FQDN:%s", serverFqdn);
exit(-1);
}
checkPort(serverIp, startPort, endPort, pktLen);
return;
}
static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) {
int port = startPort;
int num = endPort - startPort + 1;
if (num < 0) {
num = 1;
}
pthread_t *pids = malloc(2 * num * sizeof(pthread_t));
info_s * tinfos = malloc(num * sizeof(info_s));
info_s * uinfos = malloc(num * sizeof(info_s));
for (size_t i = 0; i < num; i++) {
info_s *tcpInfo = tinfos + i;
tcpInfo->port = port + i;
tcpInfo->pktLen = pktLen;
if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
exit(-1);
}
info_s *udpInfo = uinfos + i;
udpInfo->port = port + i;
if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
exit(-1);
}
}
for (int i = 0; i < num; i++) {
pthread_join(pids[i], NULL);
pthread_join(pids[(num + i)], NULL);
}
}
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole) {
if (pktLen > MAX_PKG_LEN) {
printf("test packet len overflow: %d, max len not greater than %d bytes\n", pktLen, MAX_PKG_LEN);
exit(-1);
}
if (port && endPort) {
if (port > endPort) {
printf("endPort[%d] must not lesss port[%d]\n", endPort, port);
exit(-1);
}
}
if (host && host[0] != 0) {
if (strlen(host) >= TSDB_EP_LEN) {
printf("host invalid: %s\n", host);
exit(-1);
}
taosGetFqdnPortFromEp(host, serverFqdn, &g_startPort);
} else {
tstrncpy(serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
g_startPort = tsServerPort;
}
if (port) {
g_startPort = port;
}
if (endPort) {
g_endPort = endPort;
}
if (port > endPort) {
printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort);
exit(-1);
}
if (0 == strcmp("client", netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", serverFqdn, g_startPort, g_endPort, pktLen);
taosNetTestClient(serverFqdn, g_startPort, g_endPort, pktLen);
} else if (0 == strcmp("server", netTestRole)) {
taosNetTestServer(g_startPort, g_endPort, pktLen);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册