diff --git a/src/kit/CMakeLists.txt b/src/kit/CMakeLists.txt index 3bacd426f3fc92b18d50e06033f3ad913375540f..77db79e22003d04701bf7417cc9ebc06b202533e 100644 --- a/src/kit/CMakeLists.txt +++ b/src/kit/CMakeLists.txt @@ -3,5 +3,3 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(taosdemo) -#ADD_SUBDIRECTORY(taosClusterTest) -ADD_SUBDIRECTORY(taosnetwork) diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index d47f87965b6494da79f9aab52f9d8b91d6169106..dd62df170a7c87f127eb7c52c1f580b7f460b445 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -50,6 +50,9 @@ typedef struct SShellArguments { char* commands; int abort; int port; + int endPort; + int pktLen; + char* netTestRole; } SShellArguments; /**************** Function declarations ****************/ diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index c74eeb7f59de5d1b813072ec5b7f952d3c2ce632..6c09d5c9d06c1182bdd45b04fec551d8ecf99e4d 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -46,6 +46,9 @@ static struct argp_option options[] = { {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, + {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, valid option: client | server."}, + {"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."}, + {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {0}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -65,6 +68,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'P': if (arg) { tsDnodeShellPort = atoi(arg); + arguments->port = atoi(arg); } else { fprintf(stderr, "Invalid port\n"); return -1; @@ -126,6 +130,29 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'd': arguments->database = arg; break; + + case 'n': + arguments->netTestRole = arg; + break; + + case 'e': + if (arg) { + arguments->endPort = atoi(arg); + } else { + fprintf(stderr, "Invalid end port\n"); + return -1; + } + break; + + case 'l': + if (arg) { + arguments->pktLen = atoi(arg); + } else { + fprintf(stderr, "Invalid packet length\n"); + return -1; + } + break; + case OPT_ABORT: arguments->abort = 1; break; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 44de6641f62c67a47660bbca6099f9bcf96cac8e..6cb7c669cc7a08434b2558588067d007b51b3595 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -15,6 +15,7 @@ #include "os.h" #include "shell.h" +#include "tnettest.h" pthread_t pid; @@ -60,7 +61,10 @@ SShellArguments args = { .file = "\0", .dir = "\0", .threadNum = 5, - .commands = NULL + .commands = NULL, + .endPort = 6042, + .pktLen = 1000, + .netTestRole = NULL }; /* @@ -75,6 +79,11 @@ int main(int argc, char* argv[]) { shellParseArgument(argc, argv, &args); + if (args.netTestRole && args.netTestRole[0] != 0) { + taosNetTest(args.host, (uint16_t)args.port, (uint16_t)args.endPort, args.pktLen, args.netTestRole); + exit(0); + } + /* Initialize the shell */ TAOS* con = shellInit(&args); if (con == NULL) { diff --git a/src/kit/taosnetwork/CMakeLists.txt b/src/kit/taosnetwork/CMakeLists.txt deleted file mode 100644 index 9d2a5ba3f8622e9623083ee0c9788fe8716b3058..0000000000000000000000000000000000000000 --- a/src/kit/taosnetwork/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8) -PROJECT(TDengine) - -IF (TD_LINUX) - AUX_SOURCE_DIRECTORY(. SRC) - ADD_EXECUTABLE(taosClient client.c) - ADD_EXECUTABLE(taosServer server.c) - TARGET_LINK_LIBRARIES( taosServer -lpthread -lm -lrt ) - TARGET_LINK_LIBRARIES( taosClient -lpthread -lm -lrt ) -ENDIF () diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c deleted file mode 100644 index b7db2ba0a2ec607de1dde010fc89be1cda1c9c9a..0000000000000000000000000000000000000000 --- a/src/kit/taosnetwork/client.c +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define MAX_PKG_LEN (64*1000) -#define BUFFER_SIZE (MAX_PKG_LEN + 1024) -#define TEST_FQDN_LEN 128 -#define TEST_IPv4ADDR_LEN 16 - -typedef struct { - uint16_t port; - uint32_t hostIp; - char fqdn[TEST_FQDN_LEN]; - uint16_t pktLen; -} info_s; - -typedef struct Arguments { - char host[TEST_IPv4ADDR_LEN]; - char fqdn[TEST_FQDN_LEN]; - uint16_t port; - uint16_t max_port; - uint16_t pktLen; -} SArguments; - -static struct argp_option options[] = { - {0, 'h', "host ip", 0, "The host ip to connect to TDEngine. Default is localhost.", 0}, - {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6042.", 2}, - {0, 'f', "host fqdn", 0, "The host fqdn to connect to TDEngine.", 3}, - {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; - -static error_t parse_opt(int key, char *arg, struct argp_state *state) { - wordexp_t full_path; - SArguments *arguments = state->input; - switch (key) { - case 'h': - if (wordexp(arg, &full_path, 0) != 0) { - fprintf(stderr, "Invalid host ip %s\n", arg); - return -1; - } - strcpy(arguments->host, full_path.we_wordv[0]); - wordfree(&full_path); - break; - case 'p': - arguments->port = atoi(arg); - break; - case 'm': - arguments->max_port = atoi(arg); - break; - case 'l': - arguments->pktLen = atoi(arg); - break; - case 'f': - if (wordexp(arg, &full_path, 0) != 0) { - fprintf(stderr, "Invalid host fqdn %s\n", arg); - return -1; - } - strcpy(arguments->fqdn, full_path.we_wordv[0]); - wordfree(&full_path); - break; - - default: - return ARGP_ERR_UNKNOWN; - } - return 0; -} - -static struct argp argp = {options, parse_opt, 0, 0}; - -int checkTcpPort(info_s *info) { - int clientSocket; - - struct sockaddr_in serverAddr; - char sendbuf[BUFFER_SIZE]; - char recvbuf[BUFFER_SIZE]; - int iDataNum = 0; - if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - printf("socket() fail: %s\n", strerror(errno)); - return -1; - } - - // set send and recv overtime - struct timeval timeout; - timeout.tv_sec = 2; //s - timeout.tv_usec = 0; //us - if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt send timer failed:"); - } - if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt recv timer failed:"); - } - - serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(info->port); - - serverAddr.sin_addr.s_addr = info->hostIp; - - //printf("=================================\n"); - if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { - printf("connect() fail: %s\t", strerror(errno)); - return -1; - } - //printf("Connect to: %s:%d...success\n", host, port); - memset(sendbuf, 0, BUFFER_SIZE); - memset(recvbuf, 0, BUFFER_SIZE); - - struct in_addr ipStr; - memcpy(&ipStr, &info->hostIp, 4); - sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); - sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); - - send(clientSocket, sendbuf, info->pktLen, 0); - - memset(recvbuf, 0, BUFFER_SIZE); - int nleft, nread; - char *ptr = recvbuf; - nleft = info->pktLen; - while (nleft > 0) { - nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; - - if (nread == 0) { - break; - } else if (nread < 0) { - if (errno == EINTR) { - continue; - } else { - printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno)); - close(clientSocket); - return -1; - } - } else { - nleft -= nread; - ptr += nread; - iDataNum += nread; - } - } - - if (iDataNum < info->pktLen) { - printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port); - return -1; - } - //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); - - close(clientSocket); - return 0; -} - -int checkUdpPort(info_s *info) { - int clientSocket; - - struct sockaddr_in serverAddr; - char sendbuf[BUFFER_SIZE]; - char recvbuf[BUFFER_SIZE]; - int iDataNum = 0; - if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - perror("socket"); - return -1; - } - - // set overtime - struct timeval timeout; - timeout.tv_sec = 2; //s - timeout.tv_usec = 0; //us - if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt send timer failed:"); - } - if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt recv timer failed:"); - } - - serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(info->port); - serverAddr.sin_addr.s_addr = info->hostIp; - - memset(sendbuf, 0, BUFFER_SIZE); - memset(recvbuf, 0, BUFFER_SIZE); - - struct in_addr ipStr; - memcpy(&ipStr, &info->hostIp, 4); - sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); - sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); - - socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - - int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size); - if (code < 0) { - perror("sendto"); - return -1; - } - - iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); - - if (iDataNum < info->pktLen) { - printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port); - return -1; - } - - //printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); - close(clientSocket); - return 0; -} - -int32_t getIpFromFqdn(const char *fqdn, uint32_t* ip) { - struct addrinfo hints = {0}; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - struct addrinfo *result = NULL; - - int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); - if (result) { - struct sockaddr *sa = result->ai_addr; - struct sockaddr_in *si = (struct sockaddr_in*)sa; - struct in_addr ia = si->sin_addr; - *ip = ia.s_addr; - freeaddrinfo(result); - return 0; - } else { - printf("Failed get the ip address from fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret)); - return -1; - } -} - -void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) { - int ret; - info_s info; - memset(&info, 0, sizeof(info_s)); - info.hostIp = hostIp; - info.pktLen = pktLen; - - for (uint16_t port = startPort; port <= maxPort; port++) { - //printf("test: %s:%d\n", info.host, port); - printf("\n"); - - info.port = port; - ret = checkTcpPort(&info); - if (ret != 0) { - printf("tcp port:%d test fail.\t\n", port); - } else { - printf("tcp port:%d test ok.\t\t", port); - } - - ret = checkUdpPort(&info); - if (ret != 0) { - printf("udp port:%d test fail.\t\n", port); - } else { - printf("udp port:%d test ok.\t\t", port); - } - } - - printf("\n"); - return ; -} - -int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", "", 6030, 6042, 1000}; - int ret; - - argp_parse(&argp, argc, argv, 0, 0, &arguments); - if (arguments.pktLen > MAX_PKG_LEN) { - printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); - exit(0); - } - - printf("host ip: %s\thost fqdn: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.fqdn, arguments.port, arguments.max_port, arguments.pktLen); - - if (arguments.host[0] != 0) { - printf("\nstart connect to %s test:\n", arguments.host); - checkPort(inet_addr(arguments.host), arguments.port, arguments.max_port, arguments.pktLen); - printf("\n"); - } - - if (arguments.fqdn[0] != 0) { - uint32_t hostIp = 0; - ret = getIpFromFqdn(arguments.fqdn, &hostIp); - if (ret) { - printf("\n"); - return 0; - } - printf("\nstart connetc to %s test:\n", arguments.fqdn); - checkPort(hostIp, arguments.port, arguments.max_port, arguments.pktLen); - printf("\n"); - } - - return 0; -} diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c deleted file mode 100644 index 97be1d3b63369b26dedb0707d3a096b044f982ef..0000000000000000000000000000000000000000 --- a/src/kit/taosnetwork/server.c +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define MAX_PKG_LEN (64*1000) -#define BUFFER_SIZE (MAX_PKG_LEN + 1024) - -typedef struct { - int port; - uint16_t pktLen; -} info_s; - -typedef struct Arguments { - char * host; - uint16_t port; - uint16_t max_port; - uint16_t pktLen; -} SArguments; - -static struct argp_option options[] = { - {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, - {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, - {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; - -static error_t parse_opt(int key, char *arg, struct argp_state *state) { - - SArguments *arguments = state->input; - switch (key) { - case 'h': - arguments->host = arg; - break; - case 'p': - arguments->port = atoi(arg); - break; - case 'm': - arguments->max_port = atoi(arg); - break; - case 'l': - arguments->pktLen = atoi(arg); - break; - - default: - return ARGP_ERR_UNKNOWN; - } - return 0; -} - -static struct argp argp = {options, parse_opt, 0, 0}; - -static void *bindTcpPort(void *sarg) { - info_s *pinfo = (info_s *)sarg; - int port = pinfo->port; - int serverSocket; - - struct sockaddr_in server_addr; - struct sockaddr_in clientAddr; - int addr_len = sizeof(clientAddr); - int client; - char buffer[BUFFER_SIZE]; - int iDataNum = 0; - - if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - printf("socket() fail: %s", strerror(errno)); - return NULL; - } - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - server_addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - printf("port:%d bind() fail: %s", port, strerror(errno)); - return NULL; - } - - if (listen(serverSocket, 5) < 0) { - printf("listen() fail: %s", strerror(errno)); - return NULL; - } - - //printf("Bind port: %d success\n", port); - while (1) { - client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); - if (client < 0) { - printf("accept() fail: %s", strerror(errno)); - continue; - } - - memset(buffer, 0, BUFFER_SIZE); - int nleft, nread; - char *ptr = buffer; - nleft = pinfo->pktLen; - while (nleft > 0) { - nread = recv(client, ptr, BUFFER_SIZE, 0); - - if (nread == 0) { - break; - } else if (nread < 0) { - if (errno == EINTR) { - continue; - } else { - printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", inet_ntoa(clientAddr.sin_addr), port, strerror(errno)); - close(serverSocket); - return NULL; - } - } else { - nleft -= nread; - ptr += nread; - iDataNum += nread; - } - } - - printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); - if (iDataNum > 0) { - send(client, buffer, iDataNum, 0); - } - } - - close(serverSocket); - return NULL; -} - -static void *bindUdpPort(void *sarg) { - info_s *pinfo = (info_s *)sarg; - int port = pinfo->port; - int serverSocket; - - struct sockaddr_in server_addr; - struct sockaddr_in clientAddr; - char buffer[BUFFER_SIZE]; - int iDataNum; - - if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - perror("socket"); - return NULL; - } - - bzero(&server_addr, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(port); - server_addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - perror("connect"); - return NULL; - } - - socklen_t sin_size; - - while (1) { - memset(buffer, 0, BUFFER_SIZE); - - sin_size = sizeof(*(struct sockaddr *)&server_addr); - - iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size); - - if (iDataNum < 0) { - perror("recvfrom null"); - continue; - } - if (iDataNum > 0) { - printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); - //printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16); - - sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size); - } - } - - close(serverSocket); - return NULL; -} - - -int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6030, 6042, 1000}; - argp_parse(&argp, argc, argv, 0, 0, &arguments); - if (arguments.pktLen > MAX_PKG_LEN) { - printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); - exit(0); - } - - int port = arguments.port; - - int num = arguments.max_port - arguments.port + 1; - - if (num < 0) { - num = 1; - } - pthread_t *pids = malloc(2 * num * sizeof(pthread_t)); - info_s * tinfos = malloc(num * sizeof(info_s)); - info_s * uinfos = malloc(num * sizeof(info_s)); - - for (size_t i = 0; i < num; i++) { - info_s *tcpInfo = tinfos + i; - tcpInfo->port = port + i; - tcpInfo->pktLen = arguments.pktLen; - - if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0) - { - printf("create thread fail, port:%d.\n", port); - exit(-1); - } - - info_s *udpInfo = uinfos + i; - udpInfo->port = port + i; - if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0) - { - printf("create thread fail, port:%d.\n", port); - exit(-1); - } - } - - for (int i = 0; i < num; i++) { - pthread_join(pids[i], NULL); - pthread_join(pids[(num + i)], NULL); - } -} diff --git a/src/util/inc/tnettest.h b/src/util/inc/tnettest.h new file mode 100644 index 0000000000000000000000000000000000000000..3fe1dfa9204fbbf85f193078b17e0bb4f9643848 --- /dev/null +++ b/src/util/inc/tnettest.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TNETTEST_H +#define TDENGINE_TNETTEST_H + +#ifdef __cplusplus +extern "C" { +#endif + +void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TNETTEST_H diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c new file mode 100644 index 0000000000000000000000000000000000000000..d3b404f6ded10a0ecd57329f60b1813ae7515598 --- /dev/null +++ b/src/util/src/tnettest.c @@ -0,0 +1,441 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "os.h" +#include "taosdef.h" +#include "taoserror.h" +#include "tulog.h" +#include "tconfig.h" +#include "tglobal.h" +#include "tsocket.h" + +#define MAX_PKG_LEN (64*1000) +#define BUFFER_SIZE (MAX_PKG_LEN + 1024) + +typedef struct { + uint32_t hostIp; + uint16_t port; + uint16_t pktLen; +} info_s; + +static char serverFqdn[TSDB_FQDN_LEN]; +static uint16_t g_startPort = 0; +static uint16_t g_endPort = 6042; + +static void *bindUdpPort(void *sarg) { + info_s *pinfo = (info_s *)sarg; + int port = pinfo->port; + int serverSocket; + + struct sockaddr_in server_addr; + struct sockaddr_in clientAddr; + char buffer[BUFFER_SIZE]; + int iDataNum; + + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + perror("socket"); + return NULL; + } + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { + perror("connect"); + return NULL; + } + + socklen_t sin_size; + + while (1) { + memset(buffer, 0, BUFFER_SIZE); + + sin_size = sizeof(*(struct sockaddr *)&server_addr); + + iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size); + + if (iDataNum < 0) { + perror("recvfrom null"); + continue; + } + if (iDataNum > 0) { + printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); + //printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16); + + sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size); + } + } + + close(serverSocket); + return NULL; +} + +static void *bindTcpPort(void *sarg) { + info_s *pinfo = (info_s *)sarg; + int port = pinfo->port; + int serverSocket; + + struct sockaddr_in server_addr; + struct sockaddr_in clientAddr; + int addr_len = sizeof(clientAddr); + int client; + char buffer[BUFFER_SIZE]; + int iDataNum = 0; + + if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + printf("socket() fail: %s", strerror(errno)); + return NULL; + } + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { + printf("port:%d bind() fail: %s", port, strerror(errno)); + return NULL; + } + + if (listen(serverSocket, 5) < 0) { + printf("listen() fail: %s", strerror(errno)); + return NULL; + } + + //printf("Bind port: %d success\n", port); + while (1) { + client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); + if (client < 0) { + printf("accept() fail: %s", strerror(errno)); + continue; + } + + iDataNum = 0; + memset(buffer, 0, BUFFER_SIZE); + int nleft, nread; + char *ptr = buffer; + nleft = pinfo->pktLen; + while (nleft > 0) { + nread = recv(client, ptr, BUFFER_SIZE, 0); + + if (nread == 0) { + break; + } else if (nread < 0) { + if (errno == EINTR) { + continue; + } else { + printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", inet_ntoa(clientAddr.sin_addr), port, strerror(errno)); + close(serverSocket); + return NULL; + } + } else { + nleft -= nread; + ptr += nread; + iDataNum += nread; + } + } + + printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); + if (iDataNum > 0) { + send(client, buffer, iDataNum, 0); + } + } + + close(serverSocket); + return NULL; +} + +static int checkTcpPort(info_s *info) { + int clientSocket; + + struct sockaddr_in serverAddr; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int iDataNum = 0; + if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("socket() fail: %s\n", strerror(errno)); + return -1; + } + + // set send and recv overtime + struct timeval timeout; + timeout.tv_sec = 2; //s + timeout.tv_usec = 0; //us + if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt send timer failed:"); + } + if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt recv timer failed:"); + } + + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(info->port); + + serverAddr.sin_addr.s_addr = info->hostIp; + + //printf("=================================\n"); + if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { + printf("connect() fail: %s\t", strerror(errno)); + return -1; + } + //printf("Connect to: %s:%d...success\n", host, port); + memset(sendbuf, 0, BUFFER_SIZE); + memset(recvbuf, 0, BUFFER_SIZE); + + struct in_addr ipStr; + memcpy(&ipStr, &info->hostIp, 4); + sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); + sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); + + send(clientSocket, sendbuf, info->pktLen, 0); + + memset(recvbuf, 0, BUFFER_SIZE); + int nleft, nread; + char *ptr = recvbuf; + nleft = info->pktLen; + while (nleft > 0) { + nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; + + if (nread == 0) { + break; + } else if (nread < 0) { + if (errno == EINTR) { + continue; + } else { + printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno)); + close(clientSocket); + return -1; + } + } else { + nleft -= nread; + ptr += nread; + iDataNum += nread; + } + } + + if (iDataNum < info->pktLen) { + printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port); + return -1; + } + //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); + + close(clientSocket); + return 0; +} + +static int checkUdpPort(info_s *info) { + int clientSocket; + + struct sockaddr_in serverAddr; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int iDataNum = 0; + if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + perror("socket"); + return -1; + } + + // set overtime + struct timeval timeout; + timeout.tv_sec = 2; //s + timeout.tv_usec = 0; //us + if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt send timer failed:"); + } + if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt recv timer failed:"); + } + + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(info->port); + serverAddr.sin_addr.s_addr = info->hostIp; + + memset(sendbuf, 0, BUFFER_SIZE); + memset(recvbuf, 0, BUFFER_SIZE); + + struct in_addr ipStr; + memcpy(&ipStr, &info->hostIp, 4); + sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); + sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); + + socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); + + int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size); + if (code < 0) { + perror("sendto"); + return -1; + } + + iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); + + if (iDataNum < info->pktLen) { + printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port); + return -1; + } + + //printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); + close(clientSocket); + return 0; +} + +static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) { + int ret; + info_s info; + memset(&info, 0, sizeof(info_s)); + info.hostIp = hostIp; + info.pktLen = pktLen; + + for (uint16_t port = startPort; port <= maxPort; port++) { + //printf("test: %s:%d\n", info.host, port); + printf("\n"); + + info.port = port; + ret = checkTcpPort(&info); + if (ret != 0) { + printf("tcp port:%d test fail.\t\n", port); + } else { + printf("tcp port:%d test ok.\t\t", port); + } + + ret = checkUdpPort(&info); + if (ret != 0) { + printf("udp port:%d test fail.\t\n", port); + } else { + printf("udp port:%d test ok.\t\t", port); + } + } + + printf("\n"); + return ; +} + +static void taosNetTestClient(const char* serverFqdn, uint16_t startPort, uint16_t endPort, int pktLen) { + uint32_t serverIp = taosGetIpFromFqdn(serverFqdn); + if (serverIp == 0xFFFFFFFF) { + printf("Failed to resolve FQDN:%s", serverFqdn); + exit(-1); + } + + checkPort(serverIp, startPort, endPort, pktLen); + + return; +} + + + +static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) { + + int port = startPort; + int num = endPort - startPort + 1; + + if (num < 0) { + num = 1; + } + + pthread_t *pids = malloc(2 * num * sizeof(pthread_t)); + info_s * tinfos = malloc(num * sizeof(info_s)); + info_s * uinfos = malloc(num * sizeof(info_s)); + + for (size_t i = 0; i < num; i++) { + info_s *tcpInfo = tinfos + i; + tcpInfo->port = port + i; + tcpInfo->pktLen = pktLen; + + if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0) + { + printf("create thread fail, port:%d.\n", port); + exit(-1); + } + + info_s *udpInfo = uinfos + i; + udpInfo->port = port + i; + if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0) + { + printf("create thread fail, port:%d.\n", port); + exit(-1); + } + } + + for (int i = 0; i < num; i++) { + pthread_join(pids[i], NULL); + pthread_join(pids[(num + i)], NULL); + } +} + + +void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole) { + if (pktLen > MAX_PKG_LEN) { + printf("test packet len overflow: %d, max len not greater than %d bytes\n", pktLen, MAX_PKG_LEN); + exit(-1); + } + + if (port && endPort) { + if (port > endPort) { + printf("endPort[%d] must not lesss port[%d]\n", endPort, port); + exit(-1); + } + } + + if (host && host[0] != 0) { + if (strlen(host) >= TSDB_EP_LEN) { + printf("host invalid: %s\n", host); + exit(-1); + } + + taosGetFqdnPortFromEp(host, serverFqdn, &g_startPort); + } else { + tstrncpy(serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN); + g_startPort = tsServerPort; + } + + if (port) { + g_startPort = port; + } + + if (endPort) { + g_endPort = endPort; + } + + if (port > endPort) { + printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort); + exit(-1); + } + + if (0 == strcmp("client", netTestRole)) { + printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", serverFqdn, g_startPort, g_endPort, pktLen); + taosNetTestClient(serverFqdn, g_startPort, g_endPort, pktLen); + } else if (0 == strcmp("server", netTestRole)) { + taosNetTestServer(g_startPort, g_endPort, pktLen); + } +} +