tnettest.c 20.6 KB
Newer Older
H
Hui Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
H
Hui Li 已提交
17 18
#include "os.h"
#include "taosdef.h"
H
Hui Li 已提交
19
#include "taosmsg.h"
H
Hui Li 已提交
20 21 22 23
#include "taoserror.h"
#include "tulog.h"
#include "tglobal.h"
#include "tsocket.h"
H
Hui Li 已提交
24 25
#include "trpc.h"
#include "rpcHead.h"
S
TD-2861  
Shengliang Guan 已提交
26 27
#include "tchecksum.h"
#include "syncMsg.h"
H
Hui Li 已提交
28

29
#define MAX_PKG_LEN (64 * 1000)
30
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
31 32
#define MIN_SPEED_PKG_LEN 1024
#define MAX_SPEED_PKG_NUM 10000
33
#define MIN_SPEED_PKG_NUM 1
34 35 36
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)

extern int32_t tsRpcMaxUdpSize;
H
Hui Li 已提交
37 38

typedef struct {
39
  char *   hostFqdn;
H
Hui Li 已提交
40
  uint32_t hostIp;
41 42 43 44 45 46 47
  int32_t  port;
  int32_t  pktLen;
} STestInfo;

static void *taosNetBindUdpPort(void *sarg) {
  STestInfo *pinfo = (STestInfo *)sarg;
  int32_t    port = pinfo->port;
48
  SOCKET     serverSocket;
49 50 51
  char       buffer[BUFFER_SIZE];
  int32_t    iDataNum;
  socklen_t  sin_size;
S
Shengliang Guan 已提交
52
  int32_t bufSize = 1024000;
H
Hui Li 已提交
53 54 55

  struct sockaddr_in server_addr;
  struct sockaddr_in clientAddr;
56

57 58 59
  setThreadName("netBindUdpPort");

  if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
S
Shengliang Guan 已提交
60
    uError("failed to create UDP socket since %s", strerror(errno));
H
Hui Li 已提交
61 62 63 64 65 66 67 68 69
    return NULL;
  }

  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
S
Shengliang Guan 已提交
70
    uError("failed to bind UDP port:%d since %s", port, strerror(errno));
H
Hui Li 已提交
71 72 73
    return NULL;
  }

S
Shengliang Guan 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86
  if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    uError("failed to set the send buffer size for UDP socket\n");
    taosCloseSocket(serverSocket);
    return NULL;
  }

  if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    uError("failed to set the receive buffer size for UDP socket\n");
    taosCloseSocket(serverSocket);
    return NULL;
  }

  uInfo("UDP server at port:%d is listening", port);
H
Hui Li 已提交
87 88 89 90 91 92 93

  while (1) {
    memset(buffer, 0, BUFFER_SIZE);
    sin_size = sizeof(*(struct sockaddr *)&server_addr);
    iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);

    if (iDataNum < 0) {
94
      uDebug("failed to perform recvfrom func at %d since %s", port, strerror(errno));
H
Hui Li 已提交
95 96 97
      continue;
    }

S
Shengliang Guan 已提交
98 99
    uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);

100
    if (iDataNum > 0) {
S
Shengliang Guan 已提交
101
      iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
H
Hui Li 已提交
102
    }
S
Shengliang Guan 已提交
103 104

    uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
H
Hui Li 已提交
105 106
  }

S
Shengliang Guan 已提交
107
  taosCloseSocket(serverSocket);
H
Hui Li 已提交
108 109 110
  return NULL;
}

111
static void *taosNetBindTcpPort(void *sarg) {
H
Hui Li 已提交
112 113
  struct sockaddr_in server_addr;
  struct sockaddr_in clientAddr;
114

115
  STestInfo *pinfo = sarg;
116
  int32_t    port = pinfo->port;
117
  SOCKET     serverSocket;
118
  int32_t    addr_len = sizeof(clientAddr);
119
  SOCKET     client;
120
  char       buffer[BUFFER_SIZE];
H
Hui Li 已提交
121

122 123
  setThreadName("netBindTcpPort");

124
  if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
S
Shengliang Guan 已提交
125
    uError("failed to create TCP socket since %s", strerror(errno));
H
Hui Li 已提交
126 127 128 129 130 131 132 133
    return NULL;
  }

  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

S
Shengliang Guan 已提交
134 135 136 137 138 139 140
  int32_t reuse = 1;
  if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(serverSocket);
    return NULL;
  }

H
Hui Li 已提交
141
  if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
S
Shengliang Guan 已提交
142
    uError("failed to bind TCP port:%d since %s", port, strerror(errno));
H
Hui Li 已提交
143 144 145
    return NULL;
  }

S
Shengliang Guan 已提交
146 147 148
  if (taosKeepTcpAlive(serverSocket) < 0) {
    uError("failed to set tcp server keep-alive option since %s", strerror(errno));
    taosCloseSocket(serverSocket);
H
Hui Li 已提交
149 150 151
    return NULL;
  }

S
Shengliang Guan 已提交
152 153 154 155 156 157
  if (listen(serverSocket, 10) < 0) {
    uError("failed to listen TCP port:%d since %s", port, strerror(errno));
    return NULL;
  }

  uInfo("TCP server at port:%d is listening", port);
158

H
Hui Li 已提交
159
  while (1) {
160
    client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
H
Hui Li 已提交
161
    if (client < 0) {
S
Shengliang Guan 已提交
162
      uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
H
Hui Li 已提交
163 164 165
      continue;
    }

S
Shengliang Guan 已提交
166 167
    int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen);
    if (ret < 0 || ret != pinfo->pktLen) {
S
Shengliang Guan 已提交
168
      uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
S
Shengliang Guan 已提交
169 170
      taosCloseSocket(serverSocket);
      return NULL;
H
Hui Li 已提交
171
    }
172

S
Shengliang Guan 已提交
173 174 175 176
    uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);

    ret = taosWriteMsg(client, buffer, pinfo->pktLen);
    if (ret < 0) {
S
Shengliang Guan 已提交
177
      uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
S
Shengliang Guan 已提交
178 179
      taosCloseSocket(serverSocket);
      return NULL;
H
Hui Li 已提交
180
    }
S
Shengliang Guan 已提交
181 182

    uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
H
Hui Li 已提交
183
  }
S
Shengliang Guan 已提交
184

S
Shengliang Guan 已提交
185
  taosCloseSocket(serverSocket);
H
Hui Li 已提交
186 187 188
  return NULL;
}

189
static int32_t taosNetCheckTcpPort(STestInfo *info) {
190
  SOCKET  clientSocket;
S
Shengliang Guan 已提交
191
  char    buffer[BUFFER_SIZE] = {0};
192

193
  if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
S
Shengliang Guan 已提交
194
    uError("failed to create TCP client socket since %s", strerror(errno));
H
Hui Li 已提交
195 196 197
    return -1;
  }

S
Shengliang Guan 已提交
198 199 200 201 202
  int32_t reuse = 1;
  if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(clientSocket);
    return -1;
H
Hui Li 已提交
203 204
  }

S
Shengliang Guan 已提交
205 206
  struct sockaddr_in serverAddr;
  memset((char *)&serverAddr, 0, sizeof(serverAddr));
H
Hui Li 已提交
207
  serverAddr.sin_family = AF_INET;
S
Shengliang Guan 已提交
208
  serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
H
Hui Li 已提交
209 210 211
  serverAddr.sin_addr.s_addr = info->hostIp;

  if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
S
Shengliang Guan 已提交
212
    uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
H
Hui Li 已提交
213 214
    return -1;
  }
215

S
Shengliang Guan 已提交
216
  taosKeepTcpAlive(clientSocket);
H
Hui Li 已提交
217

S
Shengliang Guan 已提交
218 219
  sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
  sprintf(buffer + info->pktLen - 16, "1122334455667788");
H
Hui Li 已提交
220

S
Shengliang Guan 已提交
221 222
  int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen);
  if (ret < 0) {
S
Shengliang Guan 已提交
223
    uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
S
Shengliang Guan 已提交
224
    return -1;
H
Hui Li 已提交
225 226
  }

S
Shengliang Guan 已提交
227 228 229
  ret = taosReadMsg(clientSocket, buffer, info->pktLen);
  if (ret < 0) {
    uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
H
Hui Li 已提交
230 231 232
    return -1;
  }

S
Shengliang Guan 已提交
233
  taosCloseSocket(clientSocket);
H
Hui Li 已提交
234 235 236
  return 0;
}

237
static int32_t taosNetCheckUdpPort(STestInfo *info) {
238
  SOCKET  clientSocket;
S
Shengliang Guan 已提交
239
  char    buffer[BUFFER_SIZE] = {0};
240
  int32_t iDataNum = 0;
S
Shengliang Guan 已提交
241
  int32_t bufSize = 1024000;
242

H
Hui Li 已提交
243
  struct sockaddr_in serverAddr;
244

245
  if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
246
    uError("failed to create udp client socket since %s", strerror(errno));
H
Hui Li 已提交
247 248 249
    return -1;
  }

S
Shengliang Guan 已提交
250 251 252 253 254 255 256 257 258 259
  if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    uError("failed to set the send buffer size for UDP socket\n");
    return -1;
  }

  if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    uError("failed to set the receive buffer size for UDP socket\n");
    return -1;
  }

H
Hui Li 已提交
260 261 262
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_port = htons(info->port);
  serverAddr.sin_addr.s_addr = info->hostIp;
263

H
Hui Li 已提交
264 265
  struct in_addr ipStr;
  memcpy(&ipStr, &info->hostIp, 4);
S
Shengliang Guan 已提交
266 267
  sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
  sprintf(buffer + info->pktLen - 16, "1122334455667788");
H
Hui Li 已提交
268 269 270

  socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);

S
Shengliang Guan 已提交
271 272 273
  iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
  if (iDataNum < 0 || iDataNum != info->pktLen) {
    uError("UDP: failed to perform sendto func since %s", strerror(errno));
H
Hui Li 已提交
274 275 276
    return -1;
  }

S
Shengliang Guan 已提交
277 278 279
  memset(buffer, 0, BUFFER_SIZE);
  sin_size = sizeof(*(struct sockaddr *)&serverAddr);
  iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
H
Hui Li 已提交
280

S
Shengliang Guan 已提交
281 282
  if (iDataNum < 0 || iDataNum != info->pktLen) {
    uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
H
Hui Li 已提交
283 284
    return -1;
  }
285

S
Shengliang Guan 已提交
286
  taosCloseSocket(clientSocket);
H
Hui Li 已提交
287 288 289
  return 0;
}

290 291 292
static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort, int32_t pktLen) {
  int32_t   ret;
  STestInfo info;
H
Hui Li 已提交
293

294 295 296
  memset(&info, 0, sizeof(STestInfo));
  info.hostIp = hostIp;
  info.pktLen = pktLen;
H
Hui Li 已提交
297

298
  for (int32_t port = startPort; port <= endPort; port++) {
H
Hui Li 已提交
299
    info.port = port;
300
    ret = taosNetCheckTcpPort(&info);
H
Hui Li 已提交
301
    if (ret != 0) {
302
      printf("failed to test TCP port:%d\n", port);
H
Hui Li 已提交
303
    } else {
304
      printf("successed to test TCP port:%d\n", port);
H
Hui Li 已提交
305
    }
306 307

    ret = taosNetCheckUdpPort(&info);
H
Hui Li 已提交
308
    if (ret != 0) {
309
      printf("failed to test UDP port:%d\n", port);
H
Hui Li 已提交
310
    } else {
311
      printf("successed to test UDP port:%d\n", port);
H
Hui Li 已提交
312 313 314 315
    }
  }
}

316
void *taosNetInitRpc(char *secretEncrypt, char spi) {
H
Hui Li 已提交
317
  SRpcInit rpcInit;
318 319 320 321 322
  void *   pRpcConn = NULL;

  char user[] = "nettestinternal";
  char pass[] = "nettestinternal";
  taosEncryptPass((uint8_t *)pass, strlen(pass), secretEncrypt);
H
Hui Li 已提交
323 324 325

  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
326
  rpcInit.label = "NT";
H
Hui Li 已提交
327 328 329 330
  rpcInit.numOfThreads = 1;  // every DB connection has only one thread
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
331
  rpcInit.user = user;
H
Hui Li 已提交
332 333 334 335 336 337 338 339 340
  rpcInit.idleTime = 2000;
  rpcInit.ckey = "key";
  rpcInit.spi = spi;
  rpcInit.secret = secretEncrypt;

  pRpcConn = rpcOpen(&rpcInit);
  return pRpcConn;
}

S
Shengliang Guan 已提交
341
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupMsg *pStep) {
H
Hui Li 已提交
342 343 344
  SRpcEpSet epSet;
  SRpcMsg   reqMsg;
  SRpcMsg   rspMsg;
345
  void *    pRpcConn;
H
Hui Li 已提交
346 347 348

  char secretEncrypt[32] = {0};

349
  pRpcConn = taosNetInitRpc(secretEncrypt, spi);
H
Hui Li 已提交
350
  if (NULL == pRpcConn) {
351 352
    uError("failed to init client rpc");
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Hui Li 已提交
353 354
  }

H
Hui Li 已提交
355 356 357 358 359
  memset(&epSet, 0, sizeof(SRpcEpSet));
  epSet.inUse = 0;
  epSet.numOfEps = 1;
  epSet.port[0] = port;
  strcpy(epSet.fqdn[0], serverFqdn);
360

H
Hui Li 已提交
361 362 363 364 365
  reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
  reqMsg.pCont = rpcMallocCont(pktLen);
  reqMsg.contLen = pktLen;
  reqMsg.code = 0;
  reqMsg.handle = NULL;   // rpc handle returned to app
366 367
  reqMsg.ahandle = NULL;  // app handle set by client
  strcpy(reqMsg.pCont, "nettest");
H
Hui Li 已提交
368 369 370 371

  rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);

  if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
372 373
    uDebug("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
    return rspMsg.code;
H
Hui Li 已提交
374
  }
H
Hui Li 已提交
375

376
  int32_t code = 0;
S
Shengliang Guan 已提交
377
  if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupMsg)) {
378 379 380 381 382
    memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
    code = 1;
  }

  rpcFreeCont(rspMsg.pCont);
H
Hui Li 已提交
383
  rpcClose(pRpcConn);
384 385
  return code;
}
H
Hui Li 已提交
386

S
Shengliang Guan 已提交
387 388
static int32_t taosNetParseStartup(SStartupMsg *pCont) {
  SStartupMsg *pStep = pCont;
389 390 391 392 393 394 395
  uInfo("step:%s desc:%s", pStep->name, pStep->desc);

  if (pStep->finished) {
    uInfo("check startup finished");
  }

  return pStep->finished ? 0 : 1;
H
Hui Li 已提交
396 397
}

398 399
static void taosNetTestStartup(char *host, int32_t port) {
  uInfo("check startup, host:%s port:%d\n", host, port);
H
Hui Li 已提交
400

S
Shengliang Guan 已提交
401
  SStartupMsg *pStep = malloc(sizeof(SStartupMsg));
402 403 404 405 406
  while (1) {
    int32_t code = taosNetCheckRpc(host, port + TSDB_PORT_DNODEDNODE, 20, 0, pStep);
    if (code > 0) {
      code = taosNetParseStartup(pStep);
    }
H
Hui Li 已提交
407

408 409
    if (code > 0) {
      uDebug("continue check startup step");
H
Hui Li 已提交
410
    } else {
411 412 413 414
      if (code < 0) {
        uError("failed to check startup step, code:0x%x %s", code, tstrerror(code));
      }
      break;
H
Hui Li 已提交
415
    }
416
  }
H
Hui Li 已提交
417

418 419 420
  free(pStep);
}

S
TD-2861  
Shengliang Guan 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
static void taosNetCheckSync(char *host, int32_t port) {
  uint32_t ip = taosGetIpv4FromFqdn(host);
  if (ip == 0xffffffff) {
    uError("failed to get IP address from %s since %s", host, strerror(errno));
    return;
  }

  SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
  if (connFd < 0) {
    uError("failed to create socket while test port:%d since %s", port, strerror(errno));
    return;
  }

  SSyncMsg msg;
  memset(&msg, 0, sizeof(SSyncMsg));
  SSyncHead *pHead = &msg.head;
  pHead->type = TAOS_SMSG_TEST;
  pHead->protocol = SYNC_PROTOCOL_VERSION;
  pHead->signature = SYNC_SIGNATURE;
  pHead->code = 0;
  pHead->cId = 0;
  pHead->vgId = -1;
  pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));

  if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
    uError("failed to test port:%d while send msg since %s", port, strerror(errno));
    return;
  }

  if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
    uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
  }

  uInfo("successed to test TCP port:%d", port);
  taosCloseSocket(connFd);
}

459
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
S
TD-2861  
Shengliang Guan 已提交
460
  int32_t endPort = startPort + TSDB_PORT_SYNC;
461 462 463
  char    spi = 0;

  uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
S
TD-2861  
Shengliang Guan 已提交
464 465

  for (uint16_t port = startPort; port < endPort; port++) {
466 467 468
    int32_t sendpkgLen;
    if (pkgLen <= tsRpcMaxUdpSize) {
      sendpkgLen = tsRpcMaxUdpSize + 1000;
H
Hui Li 已提交
469
    } else {
470
      sendpkgLen = pkgLen;
H
Hui Li 已提交
471 472
    }

wmmhello's avatar
wmmhello 已提交
473
    tsRpcForceTcp = 1;
474 475
    int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
    if (ret < 0) {
476
      printf("failed to test TCP port:%d\n", port);
H
Hui Li 已提交
477
    } else {
478
      printf("successed to test TCP port:%d\n", port);
H
Hui Li 已提交
479
    }
480 481 482 483 484 485 486

    if (pkgLen >= tsRpcMaxUdpSize) {
      sendpkgLen = tsRpcMaxUdpSize - 1000;
    } else {
      sendpkgLen = pkgLen;
    }

wmmhello's avatar
wmmhello 已提交
487
    tsRpcForceTcp = 0;
488 489
    ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
    if (ret < 0) {
490
      printf("failed to test UDP port:%d\n", port);
H
Hui Li 已提交
491
    } else {
492
      printf("successed to test UDP port:%d\n", port);
H
Hui Li 已提交
493 494
    }
  }
S
TD-2861  
Shengliang Guan 已提交
495

496 497
  taosNetCheckSync(host, startPort + TSDB_PORT_SYNC);
  taosNetCheckSync(host, startPort + TSDB_PORT_ARBITRATOR);
H
Hui Li 已提交
498 499
}

500 501 502 503
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
  int32_t endPort = startPort + 11;
  uInfo("work as client, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);

504
  uint32_t serverIp = taosGetIpv4FromFqdn(host);
H
Hui Li 已提交
505
  if (serverIp == 0xFFFFFFFF) {
506
    uError("failed to resolve fqdn:%s", host);
H
Hui Li 已提交
507 508
    exit(-1);
  }
H
Hui Li 已提交
509

510 511
  uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host);
  taosNetCheckPort(serverIp, startPort, endPort, pkgLen);
H
Hui Li 已提交
512
}
H
Hui Li 已提交
513

514 515 516
static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
  int32_t endPort = startPort + 11;
  uInfo("work as server, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
H
Hui Li 已提交
517

518 519 520
  int32_t port = startPort;
  int32_t num = endPort - startPort + 1;
  if (num < 0) num = 1;
H
Hui Li 已提交
521 522

  pthread_t *pids = malloc(2 * num * sizeof(pthread_t));
523 524
  STestInfo *tinfos = malloc(num * sizeof(STestInfo));
  STestInfo *uinfos = malloc(num * sizeof(STestInfo));
H
Hui Li 已提交
525

526 527 528 529
  for (int32_t i = 0; i < num; i++) {
    STestInfo *tcpInfo = tinfos + i;
    tcpInfo->port = port + i;
    tcpInfo->pktLen = pkgLen;
H
Hui Li 已提交
530

531
    if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
S
Shengliang Guan 已提交
532
      uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
H
Hui Li 已提交
533 534 535
      exit(-1);
    }

536
    STestInfo *udpInfo = uinfos + i;
S
Shengliang Guan 已提交
537 538
    udpInfo->port = port + i;
    tcpInfo->pktLen = pkgLen;
539
    if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
S
Shengliang Guan 已提交
540
      uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
H
Hui Li 已提交
541 542 543
      exit(-1);
    }
  }
544 545

  for (int32_t i = 0; i < num; i++) {
H
Hui Li 已提交
546 547 548 549 550
    pthread_join(pids[i], NULL);
    pthread_join(pids[(num + i)], NULL);
  }
}

551 552 553 554 555 556 557
static void taosNetTestFqdn(char *host) {
  int code = 0;
  uint64_t startTime = taosGetTimestampUs();
  uint32_t ip = taosGetIpv4FromFqdn(host);
  if (ip == 0xffffffff) {
    uError("failed to get IP address from %s since %s", host, strerror(errno));
    code = -1;
558
  }
559 560
  uint64_t endTime = taosGetTimestampUs();
  uint64_t el = endTime - startTime;
561
  printf("check convert fqdn spend, status: %d\tcost: %" PRIu64 " us\n", code, el);
562 563
  return;
}
564

565 566
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
                              int32_t pkgNum, char *pkgType) {
567 568 569
  // record config
  int32_t compressTmp = tsCompressMsgSize;
  int32_t maxUdpSize  = tsRpcMaxUdpSize;
570
  int32_t forceTcp  = tsRpcForceTcp;
571

572
  if (0 == strcmp("tcp", pkgType)){
573
    tsRpcForceTcp = 1;
574 575
    tsRpcMaxUdpSize = 0;            // force tcp
  } else {
576
    tsRpcForceTcp = 0;
577 578
    tsRpcMaxUdpSize = INT_MAX;
  }
579 580 581 582 583 584 585 586 587 588 589 590 591
  tsCompressMsgSize = -1;

  SRpcEpSet epSet;
  SRpcMsg   reqMsg;
  SRpcMsg   rspMsg;
  void *    pRpcConn;
  char secretEncrypt[32] = {0};
  char    spi = 0;
  pRpcConn = taosNetInitRpc(secretEncrypt, spi);
  if (NULL == pRpcConn) {
    uError("failed to init client rpc");
    return;
  }
592

593
  printf("check net spend, host:%s port:%d pkgLen:%d pkgNum:%d pkgType:%s\n\n", host, port, pkgLen, pkgNum, pkgType);
594
  int32_t totalSucc = 0;
595
  uint64_t startT = taosGetTimestampUs();
596
  for (int32_t i = 1; i <= pkgNum; i++) {
597
    uint64_t startTime = taosGetTimestampUs();
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612

    memset(&epSet, 0, sizeof(SRpcEpSet));
    epSet.inUse = 0;
    epSet.numOfEps = 1;
    epSet.port[0] = port;
    strcpy(epSet.fqdn[0], host);

    reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
    reqMsg.pCont = rpcMallocCont(pkgLen);
    reqMsg.contLen = pkgLen;
    reqMsg.code = 0;
    reqMsg.handle = NULL;   // rpc handle returned to app
    reqMsg.ahandle = NULL;  // app handle set by client
    strcpy(reqMsg.pCont, "nettest speed");

613 614 615 616 617 618 619 620 621 622
    rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);

    int code = 0;
    if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
      uError("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
      code = -1;
    }else{
      totalSucc ++;
    }

623 624
    rpcFreeCont(rspMsg.pCont);

625 626
    uint64_t endTime = taosGetTimestampUs();
    uint64_t el = endTime - startTime;
627
    printf("progress:%5d/%d\tstatus:%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", i, pkgNum, code, el/1000.0, pkgLen/(el/1000000.0)/1024.0/1024.0);
628
  }
629
  int64_t endT = taosGetTimestampUs();
630
  uint64_t elT = endT - startT;
631
  printf("\ntotal succ:%5d/%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", totalSucc, pkgNum, elT/1000.0, pkgLen/(elT/1000000.0)/1024.0/1024.0*totalSucc);
632

633 634 635 636 637
  rpcClose(pRpcConn);

  // return config
  tsCompressMsgSize = compressTmp;
  tsRpcMaxUdpSize = maxUdpSize;
638
  tsRpcForceTcp = forceTcp;
639 640 641
  return;
}

642 643
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
                 int32_t pkgNum, char *pkgType) {
644
  tscEmbedded = 1;
645 646
  if (host == NULL) host = tsLocalFqdn;
  if (port == 0) port = tsServerPort;
647
  if (0 == strcmp("speed", role)){
648 649
    if (pkgLen <= MIN_SPEED_PKG_LEN) pkgLen = MIN_SPEED_PKG_LEN;
    if (pkgLen > MAX_SPEED_PKG_LEN) pkgLen = MAX_SPEED_PKG_LEN;
650 651
    if (pkgNum <= MIN_SPEED_PKG_NUM) pkgNum = MIN_SPEED_PKG_NUM;
    if (pkgNum > MAX_SPEED_PKG_NUM) pkgNum = MAX_SPEED_PKG_NUM;
652 653 654 655
  }else{
    if (pkgLen <= 10) pkgLen = 1000;
    if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
  }
656 657 658 659 660 661

  if (0 == strcmp("client", role)) {
    taosNetTestClient(host, port, pkgLen);
  } else if (0 == strcmp("server", role)) {
    taosNetTestServer(host, port, pkgLen);
  } else if (0 == strcmp("rpc", role)) {
662
    tscEmbedded = 0;
663
    taosNetTestRpc(host, port, pkgLen);
664 665
  } else if (0 == strcmp("sync", role)) {
    taosNetCheckSync(host, port);
666 667
  } else if (0 == strcmp("startup", role)) {
    taosNetTestStartup(host, port);
668 669
  } else if (0 == strcmp("speed", role)) {
    tscEmbedded = 0;
670
    char type[10] = {0};
671 672 673
    taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
  }else if (0 == strcmp("fqdn", role)) {
    taosNetTestFqdn(host);
H
Hui Li 已提交
674
  } else {
675
    taosNetTestStartup(host, port);
H
Hui Li 已提交
676 677
  }

678
  tscEmbedded = 0;
H
Hui Li 已提交
679
}