diff --git a/src/kit/shell/src/shellWindows.c b/src/kit/shell/src/shellWindows.c index ce986813918249ebe501e92d3af307a67c296907..a92831de25cad262365e2a95163d83aa8fc8355f 100644 --- a/src/kit/shell/src/shellWindows.c +++ b/src/kit/shell/src/shellWindows.c @@ -45,6 +45,10 @@ void printHelp() { printf("%s%s%s\n", indent, indent, "Database to use when connecting to the server."); printf("%s%s\n", indent, "-t"); printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local."); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup."); + printf("%s%s\n", indent, "-l"); + printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes."); exit(EXIT_SUCCESS); } @@ -137,6 +141,24 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { exit(EXIT_FAILURE); } } + // For time zone + else if (strcmp(argv[i], "-n") == 0) { + if (i < argc - 1) { + arguments->netTestRole = argv[++i]; + } else { + fprintf(stderr, "option -n requires an argument\n"); + exit(EXIT_FAILURE); + } + } + // For time zone + else if (strcmp(argv[i], "-l") == 0) { + if (i < argc - 1) { + arguments->pktLen = atoi(argv[++i]); + } else { + fprintf(stderr, "option -l requires an argument\n"); + exit(EXIT_FAILURE); + } + } // For temperory command TODO else if (strcmp(argv[i], "--help") == 0) { printHelp(); diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index c269d9a1ffe07c162123c44c397aba0da9593068..4619271599831b57353fd84fbc81b71d26eea95a 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -43,12 +43,13 @@ static void *taosNetBindUdpPort(void *sarg) { char buffer[BUFFER_SIZE]; int32_t iDataNum; socklen_t sin_size; + int32_t bufSize = 1024000; struct sockaddr_in server_addr; struct sockaddr_in clientAddr; if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - uError("failed to create udp socket since %s", strerror(errno)); + uError("failed to create UDP socket since %s", strerror(errno)); return NULL; } @@ -58,11 +59,23 @@ static void *taosNetBindUdpPort(void *sarg) { server_addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - uError("failed to bind udp port:%d since %s", port, strerror(errno)); + uError("failed to bind UDP port:%d since %s", port, strerror(errno)); return NULL; } - uInfo("udp server at port:%d is listening", port); + if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + uError("failed to set the send buffer size for UDP socket\n"); + taosCloseSocket(serverSocket); + return NULL; + } + + if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + uError("failed to set the receive buffer size for UDP socket\n"); + taosCloseSocket(serverSocket); + return NULL; + } + + uInfo("UDP server at port:%d is listening", port); while (1) { memset(buffer, 0, BUFFER_SIZE); @@ -76,7 +89,7 @@ static void *taosNetBindUdpPort(void *sarg) { if (iDataNum > 0) { uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); - sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size); + taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size); } } @@ -94,10 +107,9 @@ static void *taosNetBindTcpPort(void *sarg) { int32_t addr_len = sizeof(clientAddr); SOCKET client; char buffer[BUFFER_SIZE]; - int32_t iDataNum = 0; if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - uError("failed to create tcp socket since %s", strerror(errno)); + uError("failed to create TCP socket since %s", strerror(errno)); return NULL; } @@ -106,130 +118,103 @@ static void *taosNetBindTcpPort(void *sarg) { server_addr.sin_port = htons(port); server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + int32_t reuse = 1; + if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(serverSocket); + return NULL; + } + if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - uError("failed to bind tcp port:%d since %s", port, strerror(errno)); + uError("failed to bind TCP port:%d since %s", port, strerror(errno)); return NULL; } - if (listen(serverSocket, 5) < 0) { - uError("failed to listen tcp port:%d since %s", port, strerror(errno)); + + if (taosKeepTcpAlive(serverSocket) < 0) { + uError("failed to set tcp server keep-alive option since %s", strerror(errno)); + taosCloseSocket(serverSocket); return NULL; } - uInfo("tcp server at port:%d is listening", port); + if (listen(serverSocket, 10) < 0) { + uError("failed to listen TCP port:%d since %s", port, strerror(errno)); + return NULL; + } + + uInfo("TCP server at port:%d is listening", port); while (1) { client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); if (client < 0) { - uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno)); + uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno)); continue; } - iDataNum = 0; - memset(buffer, 0, BUFFER_SIZE); - int32_t nleft, nread; - char * ptr = buffer; - nleft = pinfo->pktLen; - - while (nleft > 0) { - nread = recv(client, ptr, BUFFER_SIZE, 0); - - if (nread == 0) { - break; - } else if (nread < 0) { - if (errno == EINTR) { - continue; - } else { - uError("failed to perform recv func at %d since %s", port, strerror(errno)); - taosCloseSocket(serverSocket); - return NULL; - } - } else { - nleft -= nread; - ptr += nread; - iDataNum += nread; - } + int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen); + if (ret < 0 || ret != pinfo->pktLen) { + uError("TCP: failed to read %d bytes at port:%d since %s", port, strerror(errno)); + taosCloseSocket(serverSocket); + return NULL; } - if (iDataNum > 0) { - uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); - send(client, buffer, iDataNum, 0); + uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port); + + ret = taosWriteMsg(client, buffer, pinfo->pktLen); + if (ret < 0) { + uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, strerror(errno), port); + taosCloseSocket(serverSocket); + return NULL; } + + uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port); } - + taosCloseSocket(serverSocket); return NULL; } static int32_t taosNetCheckTcpPort(STestInfo *info) { - SOCKET clientSocket; - char sendbuf[BUFFER_SIZE]; - char recvbuf[BUFFER_SIZE]; - int32_t iDataNum = 0; + SOCKET clientSocket; + char buffer[BUFFER_SIZE] = {0}; - struct sockaddr_in serverAddr; if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - uError("failed to create tcp client socket since %s", strerror(errno)); + uError("failed to create TCP client socket since %s", strerror(errno)); return -1; } - // set send and recv overtime - struct timeval timeout; - timeout.tv_sec = 2; // s - timeout.tv_usec = 0; // us - if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - uError("failed to setsockopt send timer since %s", strerror(errno)); - } - if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - uError("failed to setsockopt recv timer since %s", strerror(errno)); + int32_t reuse = 1; + if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(clientSocket); + return -1; } + struct sockaddr_in serverAddr; + memset((char *)&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(info->port); + serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port); serverAddr.sin_addr.s_addr = info->hostIp; if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { - uError("failed to connect port:%d since %s", info->port, strerror(errno)); + uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno)); return -1; } - memset(sendbuf, 0, BUFFER_SIZE); - memset(recvbuf, 0, BUFFER_SIZE); + taosKeepTcpAlive(clientSocket); - struct in_addr ipStr; - memcpy(&ipStr, &info->hostIp, 4); - sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port); - sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); - - send(clientSocket, sendbuf, info->pktLen, 0); + sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port); + sprintf(buffer + info->pktLen - 16, "1122334455667788"); - memset(recvbuf, 0, BUFFER_SIZE); - int32_t nleft, nread; - char * ptr = recvbuf; - nleft = info->pktLen; - - while (nleft > 0) { - nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; - - if (nread == 0) { - break; - } else if (nread < 0) { - if (errno == EINTR) { - continue; - } else { - uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno)); - taosCloseSocket(clientSocket); - return -1; - } - } else { - nleft -= nread; - ptr += nread; - iDataNum += nread; - } + int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen); + if (ret < 0) { + uError("TCP: failed to write msg to %s:%d since %s", info->port, taosIpStr(info->hostIp), strerror(errno)); + return -1; } - if (iDataNum < info->pktLen) { - uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port); + ret = taosReadMsg(clientSocket, buffer, info->pktLen); + if (ret < 0) { + uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno)); return -1; } @@ -237,11 +222,13 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { return 0; } + static int32_t taosNetCheckUdpPort(STestInfo *info) { SOCKET clientSocket; char sendbuf[BUFFER_SIZE]; char recvbuf[BUFFER_SIZE]; int32_t iDataNum = 0; + int32_t bufSize = 1024000; struct sockaddr_in serverAddr; @@ -254,13 +241,23 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { struct timeval timeout; timeout.tv_sec = 2; // s timeout.tv_usec = 0; // us - if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { uError("failed to setsockopt send timer since %s", strerror(errno)); } - if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { uError("failed to setsockopt recv timer since %s", strerror(errno)); } + if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + uError("failed to set the send buffer size for UDP socket\n"); + return -1; + } + + if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + uError("failed to set the receive buffer size for UDP socket\n"); + return -1; + } + serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(info->port); serverAddr.sin_addr.s_addr = info->hostIp; @@ -275,7 +272,7 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); + int32_t code = taosSendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); if (code < 0) { uError("failed to perform sendto func since %s", strerror(errno)); return -1; @@ -304,16 +301,16 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort info.port = port; ret = taosNetCheckTcpPort(&info); if (ret != 0) { - uError("failed to test tcp port:%d", port); + uError("failed to test TCP port:%d", port); } else { - uInfo("successed to test tcp port:%d", port); + uInfo("successed to test TCP port:%d", port); } ret = taosNetCheckUdpPort(&info); if (ret != 0) { - uError("failed to test udp port:%d", port); + uError("failed to test UDP port:%d", port); } else { - uInfo("successed to test udp port:%d", port); + uInfo("successed to test UDP port:%d", port); } } return; @@ -440,9 +437,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL); if (ret < 0) { - uError("failed to test tcp port:%d", port); + uError("failed to test TCP port:%d", port); } else { - uInfo("successed to test tcp port:%d", port); + uInfo("successed to test TCP port:%d", port); } if (pkgLen >= tsRpcMaxUdpSize) { @@ -453,9 +450,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL); if (ret < 0) { - uError("failed to test udp port:%d", port); + uError("failed to test UDP port:%d", port); } else { - uInfo("successed to test udp port:%d", port); + uInfo("successed to test UDP port:%d", port); } } } @@ -492,14 +489,15 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) { tcpInfo->pktLen = pkgLen; if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) { - uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); + uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); exit(-1); } STestInfo *udpInfo = uinfos + i; - udpInfo->port = (uint16_t)(port + i); + udpInfo->port = port + i; + tcpInfo->pktLen = pkgLen; if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) { - uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); + uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); exit(-1); } }