tnettest.c 21.3 KB
Newer Older
H
Hui Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
H
Hui Li 已提交
18 19
#include "os.h"
#include "taosdef.h"
H
Hongze Cheng 已提交
20
#include "tmsg.h"
H
Hui Li 已提交
21
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
22
#include "tlog.h"
H
Hui Li 已提交
23
#include "tglobal.h"
H
Hui Li 已提交
24 25
#include "trpc.h"
#include "rpcHead.h"
S
TD-2861  
Shengliang Guan 已提交
26 27
#include "tchecksum.h"
#include "syncMsg.h"
H
Hui Li 已提交
28

29 30
#include "osSocket.h"

31
#define MAX_PKG_LEN (64 * 1000)
32
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
33 34
#define MIN_SPEED_PKG_LEN 1024
#define MAX_SPEED_PKG_NUM 10000
35
#define MIN_SPEED_PKG_NUM 1
36 37
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)

38
extern int tsRpcMaxUdpSize;
H
Hui Li 已提交
39 40

typedef struct {
41
  char *   hostFqdn;
H
Hui Li 已提交
42
  uint32_t hostIp;
43 44 45 46 47 48 49
  int32_t  port;
  int32_t  pktLen;
} STestInfo;

static void *taosNetBindUdpPort(void *sarg) {
  STestInfo *pinfo = (STestInfo *)sarg;
  int32_t    port = pinfo->port;
50
  SOCKET     serverSocket;
51 52 53
  char       buffer[BUFFER_SIZE];
  int32_t    iDataNum;
  socklen_t  sin_size;
S
Shengliang Guan 已提交
54
  int32_t bufSize = 1024000;
H
Hui Li 已提交
55 56 57

  struct sockaddr_in server_addr;
  struct sockaddr_in clientAddr;
58

59 60 61
  setThreadName("netBindUdpPort");

  if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
S
Shengliang Guan 已提交
62
    uError("failed to create UDP socket since %s", strerror(errno));
H
Hui Li 已提交
63 64 65 66 67 68 69 70 71
    return NULL;
  }

  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

  if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
S
Shengliang Guan 已提交
72
    uError("failed to bind UDP port:%d since %s", port, strerror(errno));
H
Hui Li 已提交
73 74 75
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
76
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
77 78 79 80 81 82 83 84
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(serverSocket);
    return NULL;
  }
  pSocket->fd = serverSocket;
  pSocket->refId = 0;

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
85
    uError("failed to set the send buffer size for UDP socket\n");
86
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
87 88 89
    return NULL;
  }

90
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
91
    uError("failed to set the receive buffer size for UDP socket\n");
92
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
93 94 95 96
    return NULL;
  }

  uInfo("UDP server at port:%d is listening", port);
H
Hui Li 已提交
97 98 99 100 101 102 103

  while (1) {
    memset(buffer, 0, BUFFER_SIZE);
    sin_size = sizeof(*(struct sockaddr *)&server_addr);
    iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);

    if (iDataNum < 0) {
104
      uDebug("failed to perform recvfrom func at %d since %s", port, strerror(errno));
H
Hui Li 已提交
105 106 107
      continue;
    }

S
Shengliang Guan 已提交
108 109
    uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);

110
    if (iDataNum > 0) {
111
      iDataNum = taosSendto(pSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
H
Hui Li 已提交
112
    }
S
Shengliang Guan 已提交
113 114

    uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
H
Hui Li 已提交
115 116
  }

117
  taosCloseSocket(&pSocket);
H
Hui Li 已提交
118 119 120
  return NULL;
}

121
static void *taosNetBindTcpPort(void *sarg) {
H
Hui Li 已提交
122 123
  struct sockaddr_in server_addr;
  struct sockaddr_in clientAddr;
124

125
  STestInfo *pinfo = sarg;
126
  int32_t    port = pinfo->port;
127
  SOCKET     serverSocket;
128
  int32_t    addr_len = sizeof(clientAddr);
129
  SOCKET     client;
130
  char       buffer[BUFFER_SIZE];
H
Hui Li 已提交
131

132 133
  setThreadName("netBindTcpPort");

134
  if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
S
Shengliang Guan 已提交
135
    uError("failed to create TCP socket since %s", strerror(errno));
H
Hui Li 已提交
136 137 138 139 140 141 142 143
    return NULL;
  }

  bzero(&server_addr, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

S
Shengliang Guan 已提交
144
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
145
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
146 147 148 149 150 151 152 153
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(serverSocket);
    return NULL;
  }
  pSocket->fd = serverSocket;
  pSocket->refId = 0;

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
S
Shengliang Guan 已提交
154
    uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
155
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
156 157 158
    return NULL;
  }

H
Hui Li 已提交
159
  if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
S
Shengliang Guan 已提交
160
    uError("failed to bind TCP port:%d since %s", port, strerror(errno));
161
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
162 163 164
    return NULL;
  }

165
  if (taosKeepTcpAlive(pSocket) < 0) {
S
Shengliang Guan 已提交
166
    uError("failed to set tcp server keep-alive option since %s", strerror(errno));
167
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
168 169 170
    return NULL;
  }

S
Shengliang Guan 已提交
171 172
  if (listen(serverSocket, 10) < 0) {
    uError("failed to listen TCP port:%d since %s", port, strerror(errno));
173
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
174 175 176 177
    return NULL;
  }

  uInfo("TCP server at port:%d is listening", port);
178

H
Hui Li 已提交
179
  while (1) {
180
    client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
H
Hui Li 已提交
181
    if (client < 0) {
S
Shengliang Guan 已提交
182
      uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
H
Hui Li 已提交
183 184 185
      continue;
    }

186
    int32_t ret = taosReadMsg(pSocket, buffer, pinfo->pktLen);
S
Shengliang Guan 已提交
187
    if (ret < 0 || ret != pinfo->pktLen) {
S
Shengliang Guan 已提交
188
      uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
189
      taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
190
      return NULL;
H
Hui Li 已提交
191
    }
192

S
Shengliang Guan 已提交
193 194
    uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);

195
    ret = taosWriteMsg(pSocket, buffer, pinfo->pktLen);
S
Shengliang Guan 已提交
196
    if (ret < 0) {
S
Shengliang Guan 已提交
197
      uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
198
      taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
199
      return NULL;
H
Hui Li 已提交
200
    }
S
Shengliang Guan 已提交
201 202

    uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
H
Hui Li 已提交
203
  }
S
Shengliang Guan 已提交
204

205
  taosCloseSocket(&pSocket);
H
Hui Li 已提交
206 207 208
  return NULL;
}

209
static int32_t taosNetCheckTcpPort(STestInfo *info) {
210
  SOCKET  clientSocket;
S
Shengliang Guan 已提交
211
  char    buffer[BUFFER_SIZE] = {0};
212

213
  if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
S
Shengliang Guan 已提交
214
    uError("failed to create TCP client socket since %s", strerror(errno));
H
Hui Li 已提交
215 216 217
    return -1;
  }

S
Shengliang Guan 已提交
218
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
219
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
220 221 222 223 224 225 226 227
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(clientSocket);
    return -1;
  }
  pSocket->fd = clientSocket;
  pSocket->refId = 0;

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
S
Shengliang Guan 已提交
228
    uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
229
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
230
    return -1;
H
Hui Li 已提交
231 232
  }

S
Shengliang Guan 已提交
233 234
  struct sockaddr_in serverAddr;
  memset((char *)&serverAddr, 0, sizeof(serverAddr));
H
Hui Li 已提交
235
  serverAddr.sin_family = AF_INET;
S
Shengliang Guan 已提交
236
  serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
H
Hui Li 已提交
237 238 239
  serverAddr.sin_addr.s_addr = info->hostIp;

  if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
S
Shengliang Guan 已提交
240
    uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
241
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
242 243
    return -1;
  }
244

245
  taosKeepTcpAlive(pSocket);
H
Hui Li 已提交
246

S
Shengliang Guan 已提交
247 248
  sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
  sprintf(buffer + info->pktLen - 16, "1122334455667788");
H
Hui Li 已提交
249

250
  int32_t ret = taosWriteMsg(pSocket, buffer, info->pktLen);
S
Shengliang Guan 已提交
251
  if (ret < 0) {
S
Shengliang Guan 已提交
252
    uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
253
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
254
    return -1;
H
Hui Li 已提交
255 256
  }

257
  ret = taosReadMsg(pSocket, buffer, info->pktLen);
S
Shengliang Guan 已提交
258 259
  if (ret < 0) {
    uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
260
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
261 262 263
    return -1;
  }

264
  taosCloseSocket(&pSocket);
H
Hui Li 已提交
265 266 267
  return 0;
}

268
static int32_t taosNetCheckUdpPort(STestInfo *info) {
269
  SOCKET  clientSocket;
S
Shengliang Guan 已提交
270
  char    buffer[BUFFER_SIZE] = {0};
271
  int32_t iDataNum = 0;
S
Shengliang Guan 已提交
272
  int32_t bufSize = 1024000;
273

H
Hui Li 已提交
274
  struct sockaddr_in serverAddr;
275

276
  if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
277
    uError("failed to create udp client socket since %s", strerror(errno));
H
Hui Li 已提交
278 279 280
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
281
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
282 283 284 285 286 287 288 289
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(clientSocket);
    return -1;
  }
  pSocket->fd = clientSocket;
  pSocket->refId = 0;

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
290
    uError("failed to set the send buffer size for UDP socket\n");
291
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
292 293 294
    return -1;
  }

295
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
296
    uError("failed to set the receive buffer size for UDP socket\n");
297
    taosCloseSocket(&pSocket);
S
Shengliang Guan 已提交
298 299 300
    return -1;
  }

H
Hui Li 已提交
301 302 303
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_port = htons(info->port);
  serverAddr.sin_addr.s_addr = info->hostIp;
304

H
Hui Li 已提交
305 306
  struct in_addr ipStr;
  memcpy(&ipStr, &info->hostIp, 4);
S
Shengliang Guan 已提交
307 308
  sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
  sprintf(buffer + info->pktLen - 16, "1122334455667788");
H
Hui Li 已提交
309 310 311

  socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);

312
  iDataNum = taosSendto(pSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
S
Shengliang Guan 已提交
313 314
  if (iDataNum < 0 || iDataNum != info->pktLen) {
    uError("UDP: failed to perform sendto func since %s", strerror(errno));
315
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
316 317 318
    return -1;
  }

S
Shengliang Guan 已提交
319 320 321
  memset(buffer, 0, BUFFER_SIZE);
  sin_size = sizeof(*(struct sockaddr *)&serverAddr);
  iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
H
Hui Li 已提交
322

S
Shengliang Guan 已提交
323 324
  if (iDataNum < 0 || iDataNum != info->pktLen) {
    uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
325
    taosCloseSocket(&pSocket);
H
Hui Li 已提交
326 327
    return -1;
  }
328

329
  taosCloseSocket(&pSocket);
H
Hui Li 已提交
330 331 332
  return 0;
}

333 334 335
static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort, int32_t pktLen) {
  int32_t   ret;
  STestInfo info;
H
Hui Li 已提交
336

337 338 339
  memset(&info, 0, sizeof(STestInfo));
  info.hostIp = hostIp;
  info.pktLen = pktLen;
H
Hui Li 已提交
340

341
  for (int32_t port = startPort; port <= endPort; port++) {
H
Hui Li 已提交
342
    info.port = port;
343
    ret = taosNetCheckTcpPort(&info);
H
Hui Li 已提交
344
    if (ret != 0) {
345
      printf("failed to test TCP port:%d\n", port);
H
Hui Li 已提交
346
    } else {
347
      printf("successed to test TCP port:%d\n", port);
H
Hui Li 已提交
348
    }
349 350

    ret = taosNetCheckUdpPort(&info);
H
Hui Li 已提交
351
    if (ret != 0) {
352
      printf("failed to test UDP port:%d\n", port);
H
Hui Li 已提交
353
    } else {
354
      printf("successed to test UDP port:%d\n", port);
H
Hui Li 已提交
355 356 357 358
    }
  }
}

359
void *taosNetInitRpc(char *secretEncrypt, char spi) {
H
Hui Li 已提交
360
  SRpcInit rpcInit;
361 362 363 364
  void *   pRpcConn = NULL;

  char user[] = "nettestinternal";
  char pass[] = "nettestinternal";
S
Shengliang Guan 已提交
365
  taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
H
Hui Li 已提交
366 367 368

  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
369
  rpcInit.label = "NT";
H
Hui Li 已提交
370 371 372 373
  rpcInit.numOfThreads = 1;  // every DB connection has only one thread
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
374
  rpcInit.user = user;
H
Hui Li 已提交
375 376 377 378 379 380 381 382 383
  rpcInit.idleTime = 2000;
  rpcInit.ckey = "key";
  rpcInit.spi = spi;
  rpcInit.secret = secretEncrypt;

  pRpcConn = rpcOpen(&rpcInit);
  return pRpcConn;
}

S
Shengliang Guan 已提交
384
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) {
385
  SEpSet epSet;
H
Hui Li 已提交
386 387
  SRpcMsg   reqMsg;
  SRpcMsg   rspMsg;
388
  void *    pRpcConn;
H
Hui Li 已提交
389

S
Shengliang Guan 已提交
390
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
H
Hui Li 已提交
391

392
  pRpcConn = taosNetInitRpc(secretEncrypt, spi);
H
Hui Li 已提交
393
  if (NULL == pRpcConn) {
394 395
    uError("failed to init client rpc");
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Hui Li 已提交
396 397
  }

398 399 400
  memset(&epSet, 0, sizeof(SEpSet));
  strcpy(epSet.eps[0].fqdn, serverFqdn);
  epSet.eps[0].port = port;
H
Hui Li 已提交
401
  epSet.numOfEps = 1;
402

H
Hongze Cheng 已提交
403
  reqMsg.msgType = TDMT_DND_NETWORK_TEST;
H
Hui Li 已提交
404 405 406 407
  reqMsg.pCont = rpcMallocCont(pktLen);
  reqMsg.contLen = pktLen;
  reqMsg.code = 0;
  reqMsg.handle = NULL;   // rpc handle returned to app
408
  reqMsg.ahandle = NULL;  // app handle set by client
409
  strcpy(reqMsg.pCont, "dnode-nettest");
H
Hui Li 已提交
410 411 412

  rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);

H
Hongze Cheng 已提交
413
  if ((rspMsg.code != 0) || (rspMsg.msgType != TDMT_DND_NETWORK_TEST + 1)) {
414 415
    uDebug("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
    return rspMsg.code;
H
Hui Li 已提交
416
  }
H
Hui Li 已提交
417

418
  int32_t code = 0;
S
Shengliang Guan 已提交
419
  if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupReq)) {
420 421 422 423 424
    memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
    code = 1;
  }

  rpcFreeCont(rspMsg.pCont);
H
Hui Li 已提交
425
  rpcClose(pRpcConn);
426 427
  return code;
}
H
Hui Li 已提交
428

S
Shengliang Guan 已提交
429 430
static int32_t taosNetParseStartup(SStartupReq *pCont) {
  SStartupReq *pStep = pCont;
431 432 433 434 435 436 437
  uInfo("step:%s desc:%s", pStep->name, pStep->desc);

  if (pStep->finished) {
    uInfo("check startup finished");
  }

  return pStep->finished ? 0 : 1;
H
Hui Li 已提交
438 439
}

440 441
static void taosNetTestStartup(char *host, int32_t port) {
  uInfo("check startup, host:%s port:%d\n", host, port);
H
Hui Li 已提交
442

wafwerar's avatar
wafwerar 已提交
443
  SStartupReq *pStep = taosMemoryMalloc(sizeof(SStartupReq));
444
  while (1) {
445
    int32_t code = taosNetCheckRpc(host, port, 20, 0, pStep);
446 447 448
    if (code > 0) {
      code = taosNetParseStartup(pStep);
    }
H
Hui Li 已提交
449

450 451
    if (code > 0) {
      uDebug("continue check startup step");
H
Hui Li 已提交
452
    } else {
453 454 455 456
      if (code < 0) {
        uError("failed to check startup step, code:0x%x %s", code, tstrerror(code));
      }
      break;
H
Hui Li 已提交
457
    }
458
  }
H
Hui Li 已提交
459

wafwerar's avatar
wafwerar 已提交
460
  taosMemoryFree(pStep);
461 462
}

S
TD-2861  
Shengliang Guan 已提交
463 464 465 466 467 468 469
static void taosNetCheckSync(char *host, int32_t port) {
  uint32_t ip = taosGetIpv4FromFqdn(host);
  if (ip == 0xffffffff) {
    uError("failed to get IP address from %s since %s", host, strerror(errno));
    return;
  }

470 471
  TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
  if (pSocket == NULL) {
S
TD-2861  
Shengliang Guan 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
    uError("failed to create socket while test port:%d since %s", port, strerror(errno));
    return;
  }

  SSyncMsg msg;
  memset(&msg, 0, sizeof(SSyncMsg));
  SSyncHead *pHead = &msg.head;
  pHead->type = TAOS_SMSG_TEST;
  pHead->protocol = SYNC_PROTOCOL_VERSION;
  pHead->signature = SYNC_SIGNATURE;
  pHead->code = 0;
  pHead->cId = 0;
  pHead->vgId = -1;
  pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));

488
  if (taosWriteMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
S
TD-2861  
Shengliang Guan 已提交
489 490 491 492
    uError("failed to test port:%d while send msg since %s", port, strerror(errno));
    return;
  }

493
  if (taosReadMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
S
TD-2861  
Shengliang Guan 已提交
494 495 496 497
    uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
  }

  uInfo("successed to test TCP port:%d", port);
498
  taosCloseSocket(&pSocket);
S
TD-2861  
Shengliang Guan 已提交
499 500
}

501 502 503
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
  char    spi = 0;

504
  uInfo("check rpc, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
S
TD-2861  
Shengliang Guan 已提交
505

506 507 508
  uint16_t port = startPort;
  int32_t sendpkgLen;
  if (pkgLen <= tsRpcMaxUdpSize) {
509
      sendpkgLen = tsRpcMaxUdpSize + 1000;
510
  } else {
511
      sendpkgLen = pkgLen;
512
  }
H
Hui Li 已提交
513

514 515 516
  tsRpcForceTcp = 1;
  int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
  if (ret < 0) {
517
      printf("failed to test TCP port:%d\n", port);
518
  } else {
519
      printf("successed to test TCP port:%d\n", port);
520
  }
521

522
  if (pkgLen >= tsRpcMaxUdpSize) {
523
      sendpkgLen = tsRpcMaxUdpSize - 1000;
524
  } else {
525
      sendpkgLen = pkgLen;
526 527 528 529 530
  }
/*
  tsRpcForceTcp = 0;
  ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
  if (ret < 0) {
531
      printf("failed to test UDP port:%d\n", port);
532
  } else {
533
      printf("successed to test UDP port:%d\n", port);
H
Hui Li 已提交
534
  }
535
  */
S
TD-2861  
Shengliang Guan 已提交
536

537
  taosNetCheckSync(host, startPort);
H
Hui Li 已提交
538 539
}

540
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
541
  uInfo("work as client, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
542

543
  uint32_t serverIp = taosGetIpv4FromFqdn(host);
H
Hui Li 已提交
544
  if (serverIp == 0xFFFFFFFF) {
545
    uError("failed to resolve fqdn:%s", host);
H
Hui Li 已提交
546 547
    exit(-1);
  }
H
Hui Li 已提交
548

549
  uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host);
550
  taosNetCheckPort(serverIp, startPort, startPort, pkgLen);
H
Hui Li 已提交
551
}
H
Hui Li 已提交
552

553
static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
554
  uInfo("work as server, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
H
Hui Li 已提交
555

556
  int32_t port = startPort;
557
  int32_t num = 1;
558
  if (num < 0) num = 1;
H
Hui Li 已提交
559

wafwerar's avatar
wafwerar 已提交
560 561 562
  TdThread *pids = taosMemoryMalloc(2 * num * sizeof(TdThread));
  STestInfo *tinfos = taosMemoryMalloc(num * sizeof(STestInfo));
  STestInfo *uinfos = taosMemoryMalloc(num * sizeof(STestInfo));
H
Hui Li 已提交
563

564 565 566 567
  for (int32_t i = 0; i < num; i++) {
    STestInfo *tcpInfo = tinfos + i;
    tcpInfo->port = port + i;
    tcpInfo->pktLen = pkgLen;
H
Hui Li 已提交
568

wafwerar's avatar
wafwerar 已提交
569
    if (taosThreadCreate(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
S
Shengliang Guan 已提交
570
      uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
H
Hui Li 已提交
571 572 573
      exit(-1);
    }

574
    STestInfo *udpInfo = uinfos + i;
S
Shengliang Guan 已提交
575 576
    udpInfo->port = port + i;
    tcpInfo->pktLen = pkgLen;
wafwerar's avatar
wafwerar 已提交
577
    if (taosThreadCreate(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
S
Shengliang Guan 已提交
578
      uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
H
Hui Li 已提交
579 580 581
      exit(-1);
    }
  }
582 583

  for (int32_t i = 0; i < num; i++) {
wafwerar's avatar
wafwerar 已提交
584 585
    taosThreadJoin(pids[i], NULL);
    taosThreadJoin(pids[(num + i)], NULL);
H
Hui Li 已提交
586 587 588
  }
}

589 590 591 592 593 594 595
static void taosNetTestFqdn(char *host) {
  int code = 0;
  uint64_t startTime = taosGetTimestampUs();
  uint32_t ip = taosGetIpv4FromFqdn(host);
  if (ip == 0xffffffff) {
    uError("failed to get IP address from %s since %s", host, strerror(errno));
    code = -1;
596
  }
597 598
  uint64_t endTime = taosGetTimestampUs();
  uint64_t el = endTime - startTime;
599
  printf("check convert fqdn spend, status: %d\tcost: %" PRIu64 " us\n", code, el);
600 601
  return;
}
602

603 604
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
                              int32_t pkgNum, char *pkgType) {
605 606 607
  // record config
  int32_t compressTmp = tsCompressMsgSize;
  int32_t maxUdpSize  = tsRpcMaxUdpSize;
608
  int32_t forceTcp  = tsRpcForceTcp;
609

610
  if (0 == strcmp("tcp", pkgType)){
611
    tsRpcForceTcp = 1;
612 613
    tsRpcMaxUdpSize = 0;            // force tcp
  } else {
614
    tsRpcForceTcp = 0;
615 616
    tsRpcMaxUdpSize = INT_MAX;
  }
617 618
  tsCompressMsgSize = -1;

619
  SEpSet epSet;
620 621 622 623 624 625 626 627 628 629
  SRpcMsg   reqMsg;
  SRpcMsg   rspMsg;
  void *    pRpcConn;
  char secretEncrypt[32] = {0};
  char    spi = 0;
  pRpcConn = taosNetInitRpc(secretEncrypt, spi);
  if (NULL == pRpcConn) {
    uError("failed to init client rpc");
    return;
  }
630

631
  printf("check net spend, host:%s port:%d pkgLen:%d pkgNum:%d pkgType:%s\n\n", host, port, pkgLen, pkgNum, pkgType);
632
  int32_t totalSucc = 0;
633
  uint64_t startT = taosGetTimestampUs();
634
  for (int32_t i = 1; i <= pkgNum; i++) {
635
    uint64_t startTime = taosGetTimestampUs();
636

637 638 639
    memset(&epSet, 0, sizeof(SEpSet));
    strcpy(epSet.eps[0].fqdn, host);
    epSet.eps[0].port = port;
640 641
    epSet.numOfEps = 1;

H
Hongze Cheng 已提交
642
    reqMsg.msgType = TDMT_DND_NETWORK_TEST;
643 644 645 646 647 648 649
    reqMsg.pCont = rpcMallocCont(pkgLen);
    reqMsg.contLen = pkgLen;
    reqMsg.code = 0;
    reqMsg.handle = NULL;   // rpc handle returned to app
    reqMsg.ahandle = NULL;  // app handle set by client
    strcpy(reqMsg.pCont, "nettest speed");

650 651 652
    rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);

    int code = 0;
H
Hongze Cheng 已提交
653
    if ((rspMsg.code != 0) || (rspMsg.msgType != TDMT_DND_NETWORK_TEST + 1)) {
654 655 656 657 658 659
      uError("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
      code = -1;
    }else{
      totalSucc ++;
    }

660 661
    rpcFreeCont(rspMsg.pCont);

662 663
    uint64_t endTime = taosGetTimestampUs();
    uint64_t el = endTime - startTime;
664
    printf("progress:%5d/%d\tstatus:%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", i, pkgNum, code, el/1000.0, pkgLen/(el/1000000.0)/1024.0/1024.0);
665
  }
666
  int64_t endT = taosGetTimestampUs();
667
  uint64_t elT = endT - startT;
668
  printf("\ntotal succ:%5d/%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", totalSucc, pkgNum, elT/1000.0, pkgLen/(elT/1000000.0)/1024.0/1024.0*totalSucc);
669

670 671 672 673 674
  rpcClose(pRpcConn);

  // return config
  tsCompressMsgSize = compressTmp;
  tsRpcMaxUdpSize = maxUdpSize;
675
  tsRpcForceTcp = forceTcp;
676 677 678
  return;
}

679 680
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
                 int32_t pkgNum, char *pkgType) {
681
  tsLogEmbedded = 1;
682 683
  if (host == NULL) host = tsLocalFqdn;
  if (port == 0) port = tsServerPort;
684
  if (0 == strcmp("speed", role)){
685 686
    if (pkgLen <= MIN_SPEED_PKG_LEN) pkgLen = MIN_SPEED_PKG_LEN;
    if (pkgLen > MAX_SPEED_PKG_LEN) pkgLen = MAX_SPEED_PKG_LEN;
687 688
    if (pkgNum <= MIN_SPEED_PKG_NUM) pkgNum = MIN_SPEED_PKG_NUM;
    if (pkgNum > MAX_SPEED_PKG_NUM) pkgNum = MAX_SPEED_PKG_NUM;
689 690 691 692
  }else{
    if (pkgLen <= 10) pkgLen = 1000;
    if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
  }
693 694 695 696 697 698

  if (0 == strcmp("client", role)) {
    taosNetTestClient(host, port, pkgLen);
  } else if (0 == strcmp("server", role)) {
    taosNetTestServer(host, port, pkgLen);
  } else if (0 == strcmp("rpc", role)) {
699
    tsLogEmbedded = 0;
700
    taosNetTestRpc(host, port, pkgLen);
701 702
  } else if (0 == strcmp("sync", role)) {
    taosNetCheckSync(host, port);
703 704
  } else if (0 == strcmp("startup", role)) {
    taosNetTestStartup(host, port);
705
  } else if (0 == strcmp("speed", role)) {
706
    tsLogEmbedded = 0;
707
    char type[10] = {0};
708 709 710
    taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
  }else if (0 == strcmp("fqdn", role)) {
    taosNetTestFqdn(host);
H
Hui Li 已提交
711
  } else {
712
    taosNetTestStartup(host, port);
H
Hui Li 已提交
713 714
  }

715
  tsLogEmbedded = 0;
H
Hui Li 已提交
716
}