diff --git a/src/dnode/inc/dnodeStep.h b/src/dnode/inc/dnodeStep.h new file mode 100644 index 0000000000000000000000000000000000000000..88789e1245b07b33123f4bc3cb2977971342ae32 --- /dev/null +++ b/src/dnode/inc/dnodeStep.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_STEP_H +#define TDENGINE_DNODE_STEP_H + +#ifdef __cplusplus +extern "C" { +#endif + +void dnodeReportStep(char *name, char *desc, int8_t finished); +void dnodeSendStartupStep(SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 6bf22cee4e2ef03348c9e07187047083cbef348e..19f5a36d8479dc4d04bd3c89df4e0b03a3e5f0eb 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -30,6 +30,7 @@ #include "dnodeVWrite.h" #include "dnodeMPeer.h" #include "dnodeMInfos.h" +#include "dnodeStep.h" static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *); @@ -56,6 +57,8 @@ int32_t dnodeInitServer() { dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue; + + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 89f657f78986b8e57c0b5e1dedb841c451db00ba..b6a13062f07bdef5d122a8446f7c5fe4a759a4bf 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -29,6 +29,7 @@ #include "dnodeMRead.h" #include "dnodeMWrite.h" #include "dnodeShell.h" +#include "dnodeStep.h" static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); @@ -74,6 +75,8 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { @@ -142,7 +145,23 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } } +static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + if (strcmp(user, "nettestinternal") == 0) { + char pass[32] = {0}; + taosEncryptPass((uint8_t *)user, strlen(user), pass); + *spi = 0; + *encrypt = 0; + *ckey = 0; + memcpy(secret, pass, TSDB_KEY_LEN); + dTrace("nettest user is authorized"); + return 0; + } + + return -1; +} + static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0; int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); if (code != TSDB_CODE_APP_NOT_READY) return code; diff --git a/src/dnode/src/dnodeStep.c b/src/dnode/src/dnodeStep.c new file mode 100644 index 0000000000000000000000000000000000000000..89d38b27948f7f6173046f9330525788d91123d1 --- /dev/null +++ b/src/dnode/src/dnodeStep.c @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "dnodeInt.h" + +static SStartupStep tsStartupStep; + +void dnodeReportStep(char *name, char *desc, int8_t finished) { + tstrncpy(tsStartupStep.name, name, sizeof(tsStartupStep.name)); + tstrncpy(tsStartupStep.desc, desc, sizeof(tsStartupStep.desc)); + tsStartupStep.finished = finished; +} + +void dnodeSendStartupStep(SRpcMsg *pMsg) { + dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont); + + SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); +#if 1 + memcpy(pStep, &tsStartupStep, sizeof(SStartupStep)); +#else + static int32_t step = 0; + sprintf(pStep->name, "module:%d", step++); + sprintf(pStep->desc, "step:%d", step++); + if (step > 10) pStep->finished = 1; +#endif + + SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); +} diff --git a/src/inc/dnode.h b/src/inc/dnode.h index eef4490800a4191c2dee55c450cf99b8381bb64d..b2bf4d5e46bd612482f1aea14007dab9153f488c 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -71,6 +71,8 @@ void dnodeDelayReprocessMWriteMsg(void *pMsg); void dnodeSendStatusMsgToMnode(); +void dnodeReportStep(char *name, char *desc, int8_t finished); + #ifdef __cplusplus } #endif diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 89803724964e049dda1706b1438a7844dd956548..5530702840bc18e516a205adcb9a785229500f8c 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -286,6 +286,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_SHOW_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_STEP_NAME_LEN 32 +#define TSDB_STEP_DESC_LEN 128 + #define TSDB_MQTT_HOSTNAME_LEN 64 #define TSDB_MQTT_PORT_LEN 8 #define TSDB_MQTT_USER_LEN 24 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b77db69c46038419703944764cbb2837674d1c21..f1ee7e04149bdff0ebac844d0523ebddef5dd3c9 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -105,10 +105,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) - - -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "network-test" ) - +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 105 @@ -838,6 +835,14 @@ typedef struct { char ckey[TSDB_KEY_LEN]; } SAuthMsg, SAuthRsp; +typedef struct { + int8_t finished; + int8_t reserved1[7]; + char name[TSDB_STEP_NAME_LEN]; + char desc[TSDB_STEP_DESC_LEN]; + char reserved2[64]; +} SStartupStep; + #pragma pack(pop) #ifdef __cplusplus diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index d65c943e28d7d5a63aba4fc1839a4ba9cf744746..2c6e4a308ca04e825ea3b876a1b278ccb6feb279 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -51,7 +51,6 @@ typedef struct SShellArguments { char* commands; int abort; int port; - int endPort; int pktLen; char* netTestRole; } SShellArguments; diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 22f01ac142c72603b9fa0595ce8c1fc89c626d7e..af8beb7987b2cf5003a59e623c6e76710490992b 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -32,14 +32,14 @@ /**************** Global variables ****************/ #ifdef _TD_POWER_ char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n" - "Copyright (c) 2017 by PowerDB, Inc. All rights reserved.\n\n"; + "Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n"; char PROMPT_HEADER[] = "power> "; char CONTINUE_PROMPT[] = " -> "; int prompt_size = 7; #else char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; + "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; char PROMPT_HEADER[] = "taos> "; char CONTINUE_PROMPT[] = " -> "; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 3226ad830a1995eef563fd630f8ccb6769f5e16a..f896253fb4ea4fedca07ce82d0c99ef162146df5 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -46,8 +46,7 @@ static struct argp_option options[] = { {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, - {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|clients|server."}, - {"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."}, + {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|server|rpc|startup."}, {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {0}}; @@ -130,20 +129,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'd': arguments->database = arg; break; - case 'n': arguments->netTestRole = arg; break; - - case 'e': - if (arg) { - arguments->endPort = atoi(arg); - } else { - fprintf(stderr, "Invalid end port\n"); - return -1; - } - break; - case 'l': if (arg) { arguments->pktLen = atoi(arg); @@ -152,7 +140,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { return -1; } break; - case OPT_ABORT: arguments->abort = 1; break; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 2083ad3e9b7d5a101f35af4fe20dcd38957d5be2..a2ce78d36f7b624985592c4cfc60ce0fe741d39e 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -61,8 +61,7 @@ SShellArguments args = { .file = "\0", .dir = "\0", .threadNum = 5, - .commands = NULL, - .endPort = 6042, + .commands = NULL, .pktLen = 1000, .netTestRole = NULL }; @@ -81,9 +80,7 @@ int main(int argc, char* argv[]) { if (args.netTestRole && args.netTestRole[0] != 0) { taos_init(); - CmdArguments cmdArgs; - memcpy(&cmdArgs, &args, sizeof(SShellArguments)); - taosNetTest(&cmdArgs); + taosNetTest(args.netTestRole, args.host, args.port, args.pktLen); exit(0); } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 00a97d7bc2474e31931e07edfdc4df617578e531..587c079fe6201ac804aca35ab15d54bff968688e 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1086,13 +1086,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - - if (pHead->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS); - rpcFreeMsg(pRecv->msg); - return pConn; - } - rpcProcessIncomingMsg(pConn, pHead, pContext); } } diff --git a/src/util/inc/tnettest.h b/src/util/inc/tnettest.h index 426df5cbb28b9c0fcada049c7242730359d2a3fc..b7585bd7155421d1f22e5f989dc7d1ae6f8be491 100644 --- a/src/util/inc/tnettest.h +++ b/src/util/inc/tnettest.h @@ -20,27 +20,7 @@ extern "C" { #endif -typedef struct CmdArguments { - char* host; - char* password; - char* user; - char* auth; - char* database; - char* timezone; - bool is_raw_time; - bool is_use_passwd; - char file[TSDB_FILENAME_LEN]; - char dir[TSDB_FILENAME_LEN]; - int threadNum; - char* commands; - int abort; - int port; - int endPort; - int pktLen; - char* netTestRole; -} CmdArguments; - -void taosNetTest(CmdArguments* args); +void taosNetTest(char *role, char *host, int port, int pkgLen); #ifdef __cplusplus } diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index 6fd526598365f831addd1bacb0b7f748d9552fdb..7c8178e82741ad89a77fa26033ff22891fb2fab2 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -13,50 +13,42 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" #include "taosmsg.h" #include "taoserror.h" #include "tulog.h" -#include "tconfig.h" #include "tglobal.h" #include "tsocket.h" #include "trpc.h" #include "rpcHead.h" -#include "tutil.h" -#include "tnettest.h" -#define MAX_PKG_LEN (64*1000) -#define BUFFER_SIZE (MAX_PKG_LEN + 1024) +#define MAX_PKG_LEN (64 * 1000) +#define BUFFER_SIZE (MAX_PKG_LEN + 1024) + +extern int32_t tsRpcMaxUdpSize; typedef struct { + char * hostFqdn; uint32_t hostIp; - uint16_t port; - uint16_t pktLen; -} info_s; - -extern int tsRpcMaxUdpSize; - -static char g_user[TSDB_USER_LEN+1] = {0}; -static char g_pass[TSDB_PASSWORD_LEN+1] = {0}; -static char g_serverFqdn[TSDB_FQDN_LEN] = {0}; -static uint16_t g_startPort = 0; -static uint16_t g_endPort = 6042; -static uint32_t g_pktLen = 0; - - -static void *bindUdpPort(void *sarg) { - info_s *pinfo = (info_s *)sarg; - int port = pinfo->port; - SOCKET serverSocket; + int32_t port; + int32_t pktLen; +} STestInfo; + +static void *taosNetBindUdpPort(void *sarg) { + STestInfo *pinfo = (STestInfo *)sarg; + int32_t port = pinfo->port; + SOCKET serverSocket; + char buffer[BUFFER_SIZE]; + int32_t iDataNum; + socklen_t sin_size; struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - char buffer[BUFFER_SIZE]; - int iDataNum; - + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - perror("socket"); + uError("failed to create udp socket since %s", strerror(errno)); return NULL; } @@ -66,28 +58,25 @@ static void *bindUdpPort(void *sarg) { server_addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - perror("connect"); + uError("failed to bind udp port:%d since %s", port, strerror(errno)); return NULL; } - socklen_t sin_size; + uInfo("udp server at port:%d is listening", port); while (1) { memset(buffer, 0, BUFFER_SIZE); - sin_size = sizeof(*(struct sockaddr *)&server_addr); - iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size); if (iDataNum < 0) { - perror("recvfrom null"); + uDebug("failed to perform recvfrom func at %d since %s", port, strerror(errno)); continue; } - if (iDataNum > 0) { - printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", taosInetNtoa(clientAddr.sin_addr), port, iDataNum); - //printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16); - sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size); + if (iDataNum > 0) { + uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); + sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size); } } @@ -95,20 +84,20 @@ static void *bindUdpPort(void *sarg) { return NULL; } -static void *bindTcpPort(void *sarg) { - info_s *pinfo = (info_s *)sarg; - int port = pinfo->port; - SOCKET serverSocket; - +static void *taosNetBindTcpPort(void *sarg) { struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - int addr_len = sizeof(clientAddr); - SOCKET client; - char buffer[BUFFER_SIZE]; - int iDataNum = 0; + + STestInfo *pinfo = sarg; + int32_t port = pinfo->port; + SOCKET serverSocket; + int32_t addr_len = sizeof(clientAddr); + SOCKET client; + char buffer[BUFFER_SIZE]; + int32_t iDataNum = 0; if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - printf("socket() fail: %s", strerror(errno)); + uError("failed to create tcp socket since %s", strerror(errno)); return NULL; } @@ -118,28 +107,30 @@ static void *bindTcpPort(void *sarg) { server_addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - printf("port:%d bind() fail: %s", port, strerror(errno)); + uError("failed to bind tcp port:%d since %s", port, strerror(errno)); return NULL; } if (listen(serverSocket, 5) < 0) { - printf("listen() fail: %s", strerror(errno)); + uError("failed to listen tcp port:%d since %s", port, strerror(errno)); return NULL; } - //printf("Bind port: %d success\n", port); + uInfo("tcp server at port:%d is listening", port); + while (1) { client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); if (client < 0) { - printf("accept() fail: %s", strerror(errno)); + uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno)); continue; } iDataNum = 0; memset(buffer, 0, BUFFER_SIZE); - int nleft, nread; - char *ptr = buffer; + int32_t nleft, nread; + char * ptr = buffer; nleft = pinfo->pktLen; + while (nleft > 0) { nread = recv(client, ptr, BUFFER_SIZE, 0); @@ -149,7 +140,7 @@ static void *bindTcpPort(void *sarg) { if (errno == EINTR) { continue; } else { - printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", taosInetNtoa(clientAddr.sin_addr), port, strerror(errno)); + uError("failed to perform recv func at %d since %s", port, strerror(errno)); taosCloseSocket(serverSocket); return NULL; } @@ -157,11 +148,11 @@ static void *bindTcpPort(void *sarg) { nleft -= nread; ptr += nread; iDataNum += nread; - } + } } - - printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", taosInetNtoa(clientAddr.sin_addr), port, iDataNum); + if (iDataNum > 0) { + uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); send(client, buffer, iDataNum, 0); } } @@ -170,39 +161,38 @@ static void *bindTcpPort(void *sarg) { return NULL; } -static int checkTcpPort(info_s *info) { +static int32_t taosNetCheckTcpPort(STestInfo *info) { + SOCKET clientSocket; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int32_t iDataNum = 0; + struct sockaddr_in serverAddr; - SOCKET clientSocket; - char sendbuf[BUFFER_SIZE]; - char recvbuf[BUFFER_SIZE]; - int iDataNum = 0; if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - printf("socket() fail: %s\n", strerror(errno)); + uError("failed to create tcp client socket since %s", strerror(errno)); return -1; } // set send and recv overtime - struct timeval timeout; - timeout.tv_sec = 2; //s - timeout.tv_usec = 0; //us - if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt send timer failed:"); + struct timeval timeout; + timeout.tv_sec = 2; // s + timeout.tv_usec = 0; // us + if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + uError("failed to setsockopt send timer since %s", strerror(errno)); } - if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt recv timer failed:"); + if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + uError("failed to setsockopt recv timer since %s", strerror(errno)); } serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(info->port); - serverAddr.sin_addr.s_addr = info->hostIp; - //printf("=================================\n"); if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { - printf("connect() fail: %s\t", strerror(errno)); + uError("failed to connect port:%d since %s", info->port, strerror(errno)); return -1; } - //printf("Connect to: %s:%d...success\n", host, port); + memset(sendbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE); @@ -214,9 +204,10 @@ static int checkTcpPort(info_s *info) { send(clientSocket, sendbuf, info->pktLen, 0); memset(recvbuf, 0, BUFFER_SIZE); - int nleft, nread; - char *ptr = recvbuf; + int32_t nleft, nread; + char * ptr = recvbuf; nleft = info->pktLen; + while (nleft > 0) { nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; @@ -226,7 +217,7 @@ static int checkTcpPort(info_s *info) { if (errno == EINTR) { continue; } else { - printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno)); + uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno)); taosCloseSocket(clientSocket); return -1; } @@ -234,45 +225,46 @@ static int checkTcpPort(info_s *info) { nleft -= nread; ptr += nread; iDataNum += nread; - } + } } if (iDataNum < info->pktLen) { - printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port); + uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port); return -1; } - //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); taosCloseSocket(clientSocket); return 0; } -static int checkUdpPort(info_s *info) { +static int32_t taosNetCheckUdpPort(STestInfo *info) { + SOCKET clientSocket; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int32_t iDataNum = 0; + struct sockaddr_in serverAddr; - SOCKET clientSocket; - char sendbuf[BUFFER_SIZE]; - char recvbuf[BUFFER_SIZE]; - int iDataNum = 0; + if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - perror("socket"); + uError("failed to create udp client socket since %s", strerror(errno)); return -1; } - // set overtime + // set overtime struct timeval timeout; - timeout.tv_sec = 2; //s - timeout.tv_usec = 0; //us - if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt send timer failed:"); + timeout.tv_sec = 2; // s + timeout.tv_usec = 0; // us + if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + uError("failed to setsockopt send timer since %s", strerror(errno)); } - if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { - perror("setsockopt recv timer failed:"); + if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + uError("failed to setsockopt recv timer since %s", strerror(errno)); } - + serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(info->port); serverAddr.sin_addr.s_addr = info->hostIp; - + memset(sendbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE); @@ -283,69 +275,66 @@ static int checkUdpPort(info_s *info) { socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size); + int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); if (code < 0) { - perror("sendto"); + uError("failed to perform sendto func since %s", strerror(errno)); return -1; } iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); if (iDataNum < info->pktLen) { - printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port); + uError("UDP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port); return -1; } - - //printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); + taosCloseSocket(clientSocket); return 0; } -static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) { - int ret; - info_s info; - memset(&info, 0, sizeof(info_s)); - info.hostIp = hostIp; - info.pktLen = pktLen; +static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort, int32_t pktLen) { + int32_t ret; + STestInfo info; - for (uint16_t port = startPort; port <= maxPort; port++) { - //printf("test: %s:%d\n", info.host, port); - printf("\n"); + memset(&info, 0, sizeof(STestInfo)); + info.hostIp = hostIp; + info.pktLen = pktLen; + for (int32_t port = startPort; port <= endPort; port++) { info.port = port; - ret = checkTcpPort(&info); + ret = taosNetCheckTcpPort(&info); if (ret != 0) { - printf("tcp port:%d test fail.\t\n", port); + uError("failed to test tcp port:%d", port); } else { - printf("tcp port:%d test ok.\t\t", port); + uInfo("successed to test tcp port:%d", port); } - - ret = checkUdpPort(&info); + + ret = taosNetCheckUdpPort(&info); if (ret != 0) { - printf("udp port:%d test fail.\t\n", port); + uError("failed to test udp port:%d", port); } else { - printf("udp port:%d test ok.\t\t", port); + uInfo("successed to test udp port:%d", port); } } - - printf("\n"); - return ; + return; } -void* tnetInitRpc(char* secretEncrypt, char spi) { +void *taosNetInitRpc(char *secretEncrypt, char spi) { SRpcInit rpcInit; - void* pRpcConn = NULL; + void * pRpcConn = NULL; + + char user[] = "nettestinternal"; + char pass[] = "nettestinternal"; + taosEncryptPass((uint8_t *)pass, strlen(pass), secretEncrypt); - taosEncryptPass((uint8_t *)g_pass, strlen(g_pass), secretEncrypt); - memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; - rpcInit.label = "NET-TEST"; + rpcInit.label = "NT"; rpcInit.numOfThreads = 1; // every DB connection has only one thread rpcInit.cfp = NULL; rpcInit.sessions = 16; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = g_user; + rpcInit.user = user; rpcInit.idleTime = 2000; rpcInit.ckey = "key"; rpcInit.spi = spi; @@ -355,16 +344,17 @@ void* tnetInitRpc(char* secretEncrypt, char spi) { return pRpcConn; } -static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi) { +static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupStep *pStep) { SRpcEpSet epSet; SRpcMsg reqMsg; SRpcMsg rspMsg; - void* pRpcConn; + void * pRpcConn; char secretEncrypt[32] = {0}; - pRpcConn = tnetInitRpc(secretEncrypt, spi); + pRpcConn = taosNetInitRpc(secretEncrypt, spi); if (NULL == pRpcConn) { + uError("failed to init client rpc"); return -1; } @@ -373,205 +363,169 @@ static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktL epSet.numOfEps = 1; epSet.port[0] = port; strcpy(epSet.fqdn[0], serverFqdn); - + reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST; reqMsg.pCont = rpcMallocCont(pktLen); reqMsg.contLen = pktLen; reqMsg.code = 0; reqMsg.handle = NULL; // rpc handle returned to app - reqMsg.ahandle = NULL; // app handle set by client + reqMsg.ahandle = NULL; // app handle set by client + strcpy(reqMsg.pCont, "nettest"); rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg); - // handle response if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) { - //printf("code:%d[%s]\n", rspMsg.code, tstrerror(rspMsg.code)); + uDebug("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code)); return -1; } - - rpcFreeCont(rspMsg.pCont); + int32_t code = 0; + if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupStep)) { + memcpy(pStep, rspMsg.pCont, rspMsg.contLen); + code = 1; + } + + rpcFreeCont(rspMsg.pCont); rpcClose(pRpcConn); + return code; +} - return 0; +static int32_t taosNetParseStartup(SStartupStep *pCont) { + SStartupStep *pStep = pCont; + uInfo("step:%s desc:%s", pStep->name, pStep->desc); + + if (pStep->finished) { + uInfo("check startup finished"); + } + + return pStep->finished ? 0 : 1; } -static void rpcCheckPort(uint32_t hostIp) { - int ret; - char spi; +static void taosNetTestStartup(char *host, int32_t port) { + uInfo("check startup, host:%s port:%d\n", host, port); + + SStartupStep *pStep = malloc(sizeof(SStartupStep)); + while (1) { + int32_t code = taosNetCheckRpc(host, port, 20, 0, pStep); + if (code > 0) { + code = taosNetParseStartup(pStep); + } - for (uint16_t port = g_startPort; port <= g_endPort; port++) { - //printf("test: %s:%d\n", info.host, port); - printf("\n"); + if (code > 0) { + uDebug("continue check startup step"); + } else { + break; + } + taosMsleep(500); + } - //================ check tcp port ================ - int32_t pktLen; - if (g_pktLen <= tsRpcMaxUdpSize) { - pktLen = tsRpcMaxUdpSize + 1000; + free(pStep); +} + +static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { + int32_t endPort = startPort + 9; + char spi = 0; + + uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); + + for (uint16_t port = startPort; port <= endPort; port++) { + int32_t sendpkgLen; + if (pkgLen <= tsRpcMaxUdpSize) { + sendpkgLen = tsRpcMaxUdpSize + 1000; } else { - pktLen = g_pktLen; + sendpkgLen = pkgLen; } - spi = 1; - ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi); - if (ret != 0) { - spi = 0; - ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi); - if (ret != 0) { - printf("TCP port:%d test fail.\t\t", port); - } else { - //printf("tcp port:%d test ok.\t\t", port); - printf("TCP port:\033[32m%d test OK\033[0m\t\t", port); - } + int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL); + if (ret < 0) { + uError("failed to test tcp port:%d", port); } else { - //printf("tcp port:%d test ok.\t\t", port); - printf("TCP port:\033[32m%d test OK\033[0m\t\t", port); + uInfo("successed to test tcp port:%d", port); } - //================ check udp port ================ - if (g_pktLen >= tsRpcMaxUdpSize) { - pktLen = tsRpcMaxUdpSize - 1000; + if (pkgLen >= tsRpcMaxUdpSize) { + sendpkgLen = tsRpcMaxUdpSize - 1000; } else { - pktLen = g_pktLen; + sendpkgLen = pkgLen; } - - spi = 0; - ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi); - if (ret != 0) { - spi = 1; - ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi); - if (ret != 0) { - printf("udp port:%d test fail.\t\n", port); - } else { - //printf("udp port:%d test ok.\t\n", port); - printf("UDP port:\033[32m%d test OK\033[0m\t\n", port); - } + + ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL); + if (ret < 0) { + uError("failed to test udp port:%d", port); } else { - //printf("udp port:%d test ok.\t\n", port); - printf("UDP port:\033[32m%d test OK\033[0m\t\n", port); + uInfo("successed to test udp port:%d", port); } } - - printf("\n"); - return ; } -static void taosNetTestClient(int flag) { - uint32_t serverIp = taosGetIpFromFqdn(g_serverFqdn); +static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) { + int32_t endPort = startPort + 11; + uInfo("work as client, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); + + uint32_t serverIp = taosGetIpFromFqdn(host); if (serverIp == 0xFFFFFFFF) { - printf("Failed to resolve FQDN:%s", g_serverFqdn); + uError("failed to resolve fqdn:%s", host); exit(-1); } - if (0 == flag) { - checkPort(serverIp, g_startPort, g_endPort, g_pktLen); - } else { - rpcCheckPort(serverIp); - } - - return; + uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host); + taosNetCheckPort(serverIp, startPort, endPort, pkgLen); } -static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) { +static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) { + int32_t endPort = startPort + 11; + uInfo("work as server, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); - int port = startPort; - int num = endPort - startPort + 1; + int32_t port = startPort; + int32_t num = endPort - startPort + 1; + if (num < 0) num = 1; - if (num < 0) { - num = 1; - } - pthread_t *pids = malloc(2 * num * sizeof(pthread_t)); - info_s * tinfos = malloc(num * sizeof(info_s)); - info_s * uinfos = malloc(num * sizeof(info_s)); + STestInfo *tinfos = malloc(num * sizeof(STestInfo)); + STestInfo *uinfos = malloc(num * sizeof(STestInfo)); - for (size_t i = 0; i < num; i++) { - info_s *tcpInfo = tinfos + i; - tcpInfo->port = (uint16_t)(port + i); - tcpInfo->pktLen = pktLen; + for (int32_t i = 0; i < num; i++) { + STestInfo *tcpInfo = tinfos + i; + tcpInfo->port = port + i; + tcpInfo->pktLen = pkgLen; - if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0) - { - printf("create thread fail, port:%d.\n", port); + if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) { + uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); exit(-1); } - info_s *udpInfo = uinfos + i; + STestInfo *udpInfo = uinfos + i; udpInfo->port = (uint16_t)(port + i); - if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0) - { - printf("create thread fail, port:%d.\n", port); + if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) { + uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port); exit(-1); } } - - for (int i = 0; i < num; i++) { + + for (int32_t i = 0; i < num; i++) { pthread_join(pids[i], NULL); pthread_join(pids[(num + i)], NULL); } } - -void taosNetTest(CmdArguments *args) { - if (0 == args->pktLen) { - g_pktLen = 1000; +void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) { + tscEmbedded = 1; + if (host == NULL) host = tsLocalFqdn; + if (port == 0) port = tsServerPort; + if (pkgLen <= 10) pkgLen = 1000; + if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN; + + if (0 == strcmp("client", role)) { + taosNetTestClient(host, port, pkgLen); + } else if (0 == strcmp("server", role)) { + taosNetTestServer(host, port, pkgLen); + } else if (0 == strcmp("rpc", role)) { + taosNetTestRpc(host, port, pkgLen); + } else if (0 == strcmp("startup", role)) { + taosNetTestStartup(host, port); } else { - g_pktLen = args->pktLen; + taosNetTestStartup(host, port); } - - if (args->port && args->endPort) { - if (args->port > args->endPort) { - printf("endPort[%d] must not lesss port[%d]\n", args->endPort, args->port); - exit(-1); - } - } - - if (args->host && args->host[0] != 0) { - if (strlen(args->host) >= TSDB_EP_LEN) { - printf("host invalid: %s\n", args->host); - exit(-1); - } - taosGetFqdnPortFromEp(args->host, g_serverFqdn, &g_startPort); - } else { - tstrncpy(g_serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN); - g_startPort = tsServerPort; - } - - if (args->port) { - g_startPort = args->port; - } - - if (args->endPort) { - g_endPort = args->endPort; - } - - if (g_startPort > g_endPort) { - printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort); - exit(-1); - } - - - if (args->is_use_passwd) { - if (args->password == NULL) args->password = getpass("Enter password: "); - } else { - args->password = TSDB_DEFAULT_PASS; - } - tstrncpy(g_pass, args->password, TSDB_PASSWORD_LEN); - - if (args->user == NULL) { - args->user = TSDB_DEFAULT_USER; - } - tstrncpy(g_user, args->user, TSDB_USER_LEN); - - if (0 == strcmp("client", args->netTestRole)) { - printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen); - taosNetTestClient(0); - } else if (0 == strcmp("clients", args->netTestRole)) { - printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen); - taosNetTestClient(1); - } else if (0 == strcmp("server", args->netTestRole)) { - taosNetTestServer(g_startPort, g_endPort, g_pktLen); - } + tscEmbedded = 0; } -