tsocket.c 11.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
S
slguan 已提交
17
#include "tulog.h"
H
hzcheng 已提交
18 19 20
#include "tsocket.h"
#include "tutil.h"

J
jtao1735 已提交
21
int taosGetFqdn(char *fqdn) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22
  int  code = 0;
J
jtao1735 已提交
23 24 25 26
  char hostname[1024];
  hostname[1023] = '\0';
  gethostname(hostname, 1023);

27 28 29 30 31
  struct addrinfo hints = {0};
  struct addrinfo *result = NULL;

  hints.ai_flags = AI_CANONNAME;

H
Haojun Liao 已提交
32
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
33 34 35
  if (result) {
    strcpy(fqdn, result->ai_canonname);
    freeaddrinfo(result);
S
slguan 已提交
36
  } else {
H
Haojun Liao 已提交
37
    uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
38
    code = -1;
S
slguan 已提交
39
  }
J
jtao1735 已提交
40

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41
  return code;
H
hzcheng 已提交
42 43
}

J
jtao1735 已提交
44
uint32_t taosGetIpFromFqdn(const char *fqdn) {
45
  struct addrinfo hints = {0};
H
Haojun Liao 已提交
46 47 48
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;

49 50
  struct addrinfo *result = NULL;

51
  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
52 53 54 55 56 57 58 59
  if (result) {
    struct sockaddr *sa = result->ai_addr;
    struct sockaddr_in *si = (struct sockaddr_in*)sa;
    struct in_addr ia = si->sin_addr;
    uint32_t ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
H
Haojun Liao 已提交
60
    uError("failed get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret));
61
    return 0xFFFFFFFF;
62
  }
J
jtao1735 已提交
63 64
}

H
hzcheng 已提交
65
// Function converting an IP address string to an unsigned int.
S
slguan 已提交
66
uint32_t ip2uint(const char *const ip_addr) {
H
hzcheng 已提交
67 68 69
  char ip_addr_cpy[20];
  char ip[5];

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70
  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
H
hzcheng 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((unsigned int *)ip);
}

int taosWriteMsg(int fd, void *buf, int nbytes) {
  int   nleft, nwritten;
  char *ptr = (char *)buf;

  nleft = nbytes;

  while (nleft > 0) {
    nwritten = (int)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
    if (nwritten <= 0) {
      if (errno == EINTR)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }
  }

  return (nbytes - nleft);
}

int taosReadMsg(int fd, void *buf, int nbytes) {
  int   nleft, nread;
  char *ptr = (char *)buf;

  nleft = nbytes;

  if (fd < 0) return -1;

  while (nleft > 0) {
    nread = (int)taosReadSocket(fd, ptr, (size_t)nleft);
    if (nread == 0) {
      break;
    } else if (nread < 0) {
      if (errno == EINTR) {
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }
  }

  return (nbytes - nleft);
}

int taosNonblockwrite(int fd, char *ptr, int nbytes) {
  taosSetNonblocking(fd, 1);

  int            nleft, nwritten, nready;
  fd_set         fset;
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
    FD_SET(fd, &fset);
    if ((nready = select(fd + 1, NULL, &fset, NULL, &tv)) == 0) {
      errno = ETIMEDOUT;
S
slguan 已提交
157
      uError("fd %d timeout, no enough space to write", fd);
H
hzcheng 已提交
158 159 160 161 162
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

S
slguan 已提交
163
      uError("select error, %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
164 165 166 167 168 169 170
      return -1;
    }

    nwritten = (int)send(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

S
slguan 已提交
171
      uError("write error, %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

  taosSetNonblocking(fd, 0);

  return (nbytes - nleft);
}

int taosReadn(int fd, char *ptr, int nbytes) {
  int nread, nready, nleft = nbytes;

  fd_set         fset;
  struct timeval tv;

  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
    FD_SET(fd, &fset);
    if ((nready = select(fd + 1, NULL, &fset, NULL, &tv)) == 0) {
      errno = ETIMEDOUT;
S
slguan 已提交
197
      uError("fd %d timeout\n", fd);
H
hzcheng 已提交
198 199 200
      break;
    } else if (nready < 0) {
      if (errno == EINTR) continue;
S
slguan 已提交
201
      uError("select error, %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
202 203 204 205 206
      return -1;
    }

    if ((nread = (int)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
      if (errno == EINTR) continue;
S
slguan 已提交
207
      uError("read error, %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
208 209 210
      return -1;

    } else if (nread == 0) {
S
slguan 已提交
211
      uError("fd %d EOF", fd);
H
hzcheng 已提交
212 213 214 215 216 217 218 219 220 221
      break;  // EOF
    }

    nleft -= nread;
    ptr += nread;
  }

  return (nbytes - nleft);
}

J
jtao1735 已提交
222
int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
H
hzcheng 已提交
223 224
  struct sockaddr_in localAddr;
  int                sockFd;
225
  int                bufSize = 1024000;
H
hzcheng 已提交
226

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
  uTrace("open udp socket:0x%x:%hu", ip, port);
H
hzcheng 已提交
228 229 230

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
J
jtao1735 已提交
231
  localAddr.sin_addr.s_addr = ip;
L
lihui 已提交
232
  localAddr.sin_port = (uint16_t)htons(port);
H
hzcheng 已提交
233 234

  if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
S
slguan 已提交
235
    uError("failed to open udp socket: %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
236 237 238 239
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
slguan 已提交
240
    uError("failed to set the send buffer size for UDP socket\n");
H
hzcheng 已提交
241 242 243 244 245
    close(sockFd);
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
slguan 已提交
246
    uError("failed to set the receive buffer size for UDP socket\n");
H
hzcheng 已提交
247 248 249 250 251 252
    close(sockFd);
    return -1;
  }

  /* bind socket to local address */
  if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253
    uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
H
Hui Li 已提交
254
    close(sockFd);
H
hzcheng 已提交
255 256 257 258 259 260
    return -1;
  }

  return sockFd;
}

J
jtao1735 已提交
261
int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
H
hzcheng 已提交
262 263 264 265 266 267 268
  int                sockFd = 0;
  struct sockaddr_in serverAddr, clientAddr;
  int                ret;

  sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

  if (sockFd < 0) {
S
slguan 已提交
269
    uError("failed to open the socket: %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
270 271 272
    return -1;
  }

J
jtao1735 已提交
273
  if ( clientIp != 0) {
H
hzcheng 已提交
274 275
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
J
jtao1735 已提交
276
    clientAddr.sin_addr.s_addr = clientIp;
H
hzcheng 已提交
277 278 279 280
    clientAddr.sin_port = 0;

    /* bind socket to client address */
    if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
J
jtao1735 已提交
281 282
      uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)",
             clientIp, destIp, destPort, strerror(errno));
H
hzcheng 已提交
283 284 285 286 287 288 289
      close(sockFd);
      return -1;
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
J
jtao1735 已提交
290
  serverAddr.sin_addr.s_addr = destIp;
H
hzcheng 已提交
291 292 293 294 295
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

  ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));

  if (ret != 0) {
J
jtao1735 已提交
296
    //uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
H
Hui Li 已提交
297
    close(sockFd);
H
hzcheng 已提交
298 299 300
    sockFd = -1;
  }

301
  // taosKeepTcpAlive(sockFd);
H
hzcheng 已提交
302

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303
  return sockFd;
H
hzcheng 已提交
304 305 306 307 308
}

int taosKeepTcpAlive(int sockFd) {
  int alive = 1;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
S
slguan 已提交
309
    uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
H
hzcheng 已提交
310 311 312 313 314 315
    close(sockFd);
    return -1;
  }

  int probes = 3;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
S
slguan 已提交
316
    uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
H
hzcheng 已提交
317 318 319 320 321 322
    close(sockFd);
    return -1;
  }

  int alivetime = 10;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
S
slguan 已提交
323
    uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
H
hzcheng 已提交
324 325 326 327 328 329
    close(sockFd);
    return -1;
  }

  int interval = 3;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
S
slguan 已提交
330
    uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
H
hzcheng 已提交
331 332 333 334 335 336
    close(sockFd);
    return -1;
  }

  int nodelay = 1;
  if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
S
slguan 已提交
337
    uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
H
hzcheng 已提交
338 339 340 341
    close(sockFd);
    return -1;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
342 343 344 345 346 347 348 349 350
  struct linger linger = {0};
  linger.l_onoff = 1;
  //linger.l_linger = 0;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
    uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
    close(sockFd);
    return -1;
  }

H
hzcheng 已提交
351 352 353
  return 0;
}

J
jtao1735 已提交
354
int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
H
hzcheng 已提交
355 356 357 358
  struct sockaddr_in serverAdd;
  int                sockFd;
  int                reuse;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
359
  uTrace("open tcp server socket:0x%x:%hu", ip, port);
H
hzcheng 已提交
360 361 362

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
J
jtao1735 已提交
363
  serverAdd.sin_addr.s_addr = ip;
L
lihui 已提交
364
  serverAdd.sin_port = (uint16_t)htons(port);
H
hzcheng 已提交
365 366

  if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
S
slguan 已提交
367
    uError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
368 369 370 371 372 373
    return -1;
  }

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
S
slguan 已提交
374
    uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
H
hzcheng 已提交
375 376 377 378 379 380
    close(sockFd);
    return -1;
  };

  /* bind socket to server address */
  if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
J
jtao1735 已提交
381
    uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
H
hzcheng 已提交
382 383 384 385 386 387 388
    close(sockFd);
    return -1;
  }

  if (taosKeepTcpAlive(sockFd) < 0) return -1;

  if (listen(sockFd, 10) < 0) {
J
jtao1735 已提交
389
    uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
H
hzcheng 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
    return -1;
  }

  return sockFd;
}

void tinet_ntoa(char *ipstr, unsigned int ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

#define COPY_SIZE 32768
// sendfile shall be used

int taosCopyFds(int sfd, int dfd, int64_t len) {
  int64_t leftLen;
  int     readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = (int)leftLen;
    else
      readLen = COPY_SIZE;  // 4K

    int retLen = taosReadMsg(sfd, temp, (int)readLen);
    if (readLen != retLen) {
S
slguan 已提交
418
      uError("read error, readLen:%d retLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, retLen, len, leftLen,
H
hzcheng 已提交
419 420 421 422 423 424 425
             strerror(errno));
      return -1;
    }

    writeLen = taosWriteMsg(dfd, temp, readLen);

    if (readLen != writeLen) {
S
slguan 已提交
426
      uError("copy error, readLen:%d writeLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, writeLen, len, leftLen,
H
hzcheng 已提交
427 428 429 430 431 432 433 434 435
             strerror(errno));
      return -1;
    }

    leftLen -= readLen;
  }

  return 0;
}