osSocket.c 25.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
S
Shengliang Guan 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
wafwerar's avatar
wafwerar 已提交
21 22 23 24 25 26 27 28 29 30
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <Winsock2.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winbase.h>
#include <winsock2.h>
#include <ws2def.h>
#include "winsock2.h"
S
Shengliang Guan 已提交
31
#else
wafwerar's avatar
wafwerar 已提交
32 33 34 35 36 37 38 39 40 41
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>
S
Shengliang Guan 已提交
42 43
#endif

wafwerar's avatar
wafwerar 已提交
44 45
typedef int32_t SocketFd;
typedef SocketFd  EpollFd;
S
Shengliang Guan 已提交
46

wafwerar's avatar
wafwerar 已提交
47 48 49 50 51 52 53
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
  pthread_rwlock_t rwlock;
#endif
  int      refId;
  SocketFd fd;
} *TdSocketServerPtr, TdSocketServer;
S
Shengliang Guan 已提交
54

wafwerar's avatar
wafwerar 已提交
55 56 57 58 59 60 61
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
  pthread_rwlock_t rwlock;
#endif
  int      refId;
  SocketFd fd;
} *TdSocketPtr, TdSocket;
S
Shengliang Guan 已提交
62

wafwerar's avatar
wafwerar 已提交
63 64 65 66 67 68 69
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
  pthread_rwlock_t rwlock;
#endif
  int      refId;
  EpollFd  fd;
} *TdEpollPtr, TdEpoll;
S
Shengliang Guan 已提交
70

wafwerar's avatar
wafwerar 已提交
71 72 73 74
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
                    int addrlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
75
  }
wafwerar's avatar
wafwerar 已提交
76 77 78 79 80 81 82 83 84
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#else
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#endif
}
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
85
  }
wafwerar's avatar
wafwerar 已提交
86 87 88 89 90 91 92 93 94 95
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  return send(pSocket->fd, buf, len, 0);
  ;
#else
  return write(pSocket->fd, buf, len);
#endif
}
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
96
  }
wafwerar's avatar
wafwerar 已提交
97 98 99 100 101 102 103
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  return recv(pSocket->fd, buf, len, 0);
  ;
#else
  return read(pSocket->fd, buf, len);
#endif
}
S
Shengliang Guan 已提交
104

wafwerar's avatar
wafwerar 已提交
105 106 107
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
108
  }
wafwerar's avatar
wafwerar 已提交
109 110 111 112 113 114 115
  return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
}
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  return closesocket(fd);
#else
  return close(fd);
S
Shengliang Guan 已提交
116
#endif
wafwerar's avatar
wafwerar 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
}
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
  int32_t code;
  if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocket)->fd);
  (*ppSocket)->fd = -1;
  free(*ppSocket);
  return code;
}
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
  int32_t code;
  if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
  (*ppSocketServer)->fd = -1;
  free(*ppSocketServer);
  return code;
}
S
Shengliang Guan 已提交
138

wafwerar's avatar
wafwerar 已提交
139 140 141 142
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
143
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
144
  return closesocket(pSocket->fd);
S
Shengliang Guan 已提交
145
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
146
  return close(pSocket->fd);
S
Shengliang Guan 已提交
147
#else
wafwerar's avatar
wafwerar 已提交
148
  return shutdown(pSocket->fd, SHUT_RD);
S
Shengliang Guan 已提交
149 150
#endif
}
wafwerar's avatar
wafwerar 已提交
151 152 153 154
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
155
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
156
  return closesocket(pSocketServer->fd);
S
Shengliang Guan 已提交
157
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
158
  return close(pSocketServer->fd);
S
Shengliang Guan 已提交
159
#else
wafwerar's avatar
wafwerar 已提交
160
  return shutdown(pSocketServer->fd, SHUT_RD);
S
Shengliang Guan 已提交
161 162
#endif
}
S
Shengliang Guan 已提交
163

wafwerar's avatar
wafwerar 已提交
164 165 166
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
167
  }
wafwerar's avatar
wafwerar 已提交
168 169 170 171 172 173
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_WR);
S
slguan 已提交
174
#endif
S
Shengliang Guan 已提交
175
}
wafwerar's avatar
wafwerar 已提交
176 177 178 179 180 181 182 183 184 185
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_WR);
S
Shengliang Guan 已提交
186
#endif
wafwerar's avatar
wafwerar 已提交
187 188 189 190
}
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
191
  }
wafwerar's avatar
wafwerar 已提交
192 193 194 195 196 197 198 199 200 201 202
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_RDWR);
#endif
}
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
203
  }
wafwerar's avatar
wafwerar 已提交
204 205 206 207 208 209 210
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_RDWR);
#endif
211
}
wafwerar's avatar
wafwerar 已提交
212 213 214 215 216

#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) 
  #if defined(_TD_GO_DLL_)
    uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); }
  #endif
S
Shengliang Guan 已提交
217
#endif
218

wafwerar's avatar
wafwerar 已提交
219
void taosWinSocketInit1() {
S
Shengliang Guan 已提交
220 221 222 223 224 225 226 227 228 229
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  static char flag = 0;
  if (flag == 0) {
    WORD    wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(1, 1);
    if (WSAStartup(wVersionRequested, &wsaData) == 0) {
      flag = 1;
    }
  }
wafwerar's avatar
wafwerar 已提交
230 231
#else
#endif
232
}
wafwerar's avatar
wafwerar 已提交
233 234 235 236 237
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
S
Shengliang Guan 已提交
238 239 240
  u_long mode;
  if (on) {
    mode = 1;
wafwerar's avatar
wafwerar 已提交
241
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
S
Shengliang Guan 已提交
242 243
  } else {
    mode = 0;
wafwerar's avatar
wafwerar 已提交
244 245 246 247 248 249 250
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
  }
#else
  int32_t flags = 0;
  if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
    // printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
S
Shengliang Guan 已提交
251
  }
wafwerar's avatar
wafwerar 已提交
252 253 254 255 256 257 258 259 260 261 262

  if (on)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
    // printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
  }
#endif
S
Shengliang Guan 已提交
263 264
  return 0;
}
wafwerar's avatar
wafwerar 已提交
265 266 267 268 269
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
S
Shengliang Guan 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
  if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPCNT) {
    return 0;
  }

wafwerar's avatar
wafwerar 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298 299
  return setsockopt(pSocket->fd, level, optname, optval, optlen);
#else
  return setsockopt(pSocket->fd, level, optname, optval, (socklen_t)optlen);
#endif
}
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
  return 0;
#else
  return getsockopt(pSocket->fd, level, optname, optval, (socklen_t *)optlen);
#endif
S
Shengliang Guan 已提交
300
}
S
config  
Shengliang Guan 已提交
301
uint32_t taosInetAddr(const char *ipAddr) {
wafwerar's avatar
wafwerar 已提交
302
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
S
Shengliang Guan 已提交
303 304 305 306 307 308 309
  uint32_t value;
  int32_t  ret = inet_pton(AF_INET, ipAddr, &value);
  if (ret <= 0) {
    return INADDR_NONE;
  } else {
    return value;
  }
wafwerar's avatar
wafwerar 已提交
310 311 312
#else
  return inet_addr(ipAddr);
#endif
S
Shengliang Guan 已提交
313
}
S
Shengliang Guan 已提交
314
const char *taosInetNtoa(struct in_addr ipInt) {
wafwerar's avatar
wafwerar 已提交
315
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
S
Shengliang Guan 已提交
316 317 318
  // not thread safe, only for debug usage while print log
  static char tmpDstStr[16];
  return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
wafwerar's avatar
wafwerar 已提交
319 320
#else
  return inet_ntoa(ipInt);
321
#endif
wafwerar's avatar
wafwerar 已提交
322
}
323 324

#ifndef SIGPIPE
wafwerar's avatar
wafwerar 已提交
325
#define SIGPIPE EPIPE
326 327 328 329
#endif

#define TCP_CONN_TIMEOUT 3000  // conn timeout

wafwerar's avatar
wafwerar 已提交
330 331 332 333
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
334
  int32_t nleft, nwritten;
wafwerar's avatar
wafwerar 已提交
335
  char   *ptr = (char *)buf;
336 337 338 339

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
340
    nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
    if (nwritten <= 0) {
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
359 360 361 362
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
363
  int32_t nleft, nread;
wafwerar's avatar
wafwerar 已提交
364
  char   *ptr = (char *)buf;
365 366 367 368

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
369
    nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
370 371 372
    if (nread == 0) {
      break;
    } else if (nread < 0) {
wafwerar's avatar
wafwerar 已提交
373
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
391 392 393 394 395
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  taosSetNonblocking(pSocket, 1);
396

wafwerar's avatar
wafwerar 已提交
397 398
  int32_t        nleft, nwritten, nready;
  fd_set         fset;
399 400 401 402 403 404 405
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
wafwerar's avatar
wafwerar 已提交
406 407
    FD_SET(pSocket->fd, &fset);
    if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
408
      errno = ETIMEDOUT;
wafwerar's avatar
wafwerar 已提交
409
      // printf("fd %d timeout, no enough space to write", fd);
410 411 412 413 414
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
415
      // printf("select error, %d (%s)", errno, strerror(errno));
416 417 418
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
419
    nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
420 421 422
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
423
      // printf("write error, %d (%s)", errno, strerror(errno));
424 425 426 427 428 429 430
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

wafwerar's avatar
wafwerar 已提交
431
  taosSetNonblocking(pSocket, 0);
432 433 434 435

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
436
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
437
  struct sockaddr_in localAddr;
wafwerar's avatar
wafwerar 已提交
438 439
  SocketFd           fd;
  int32_t            bufSize = 1024000;
440

wafwerar's avatar
wafwerar 已提交
441
  // printf("open udp socket:0x%x:%hu", ip, port);
442 443 444 445 446 447

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
  localAddr.sin_addr.s_addr = ip;
  localAddr.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
448 449 450 451
  if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
    // printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
452 453
  }

wafwerar's avatar
wafwerar 已提交
454 455 456 457
  TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
458
  }
wafwerar's avatar
wafwerar 已提交
459 460
  pSocket->fd = fd;
  pSocket->refId = 0;
461

wafwerar's avatar
wafwerar 已提交
462 463 464 465 466 467 468 469 470 471
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
  }

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
472 473 474
  }

  /* bind socket to local address */
wafwerar's avatar
wafwerar 已提交
475 476 477 478
  if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
    // printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
    taosCloseSocket(&pSocket);
    return NULL;
479 480
  }

wafwerar's avatar
wafwerar 已提交
481
  return pSocket;
482 483
}

wafwerar's avatar
wafwerar 已提交
484 485 486
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
  SocketFd           fd = -1;
  int32_t            ret;
487
  struct sockaddr_in serverAddr, clientAddr;
wafwerar's avatar
wafwerar 已提交
488
  int32_t            bufSize = 1024 * 1024;
489

wafwerar's avatar
wafwerar 已提交
490
  fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
491

wafwerar's avatar
wafwerar 已提交
492 493 494 495
  if (fd <= 2) {
    // printf("failed to open the socket: %d (%s)", errno, strerror(errno));
    if (fd >= 0) taosCloseSocketNoCheck1(fd);
    return NULL;
496 497
  }

wafwerar's avatar
wafwerar 已提交
498 499 500 501 502 503 504 505
  TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;

506 507
  /* set REUSEADDR option, so the portnumber can be re-used */
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
508 509 510 511
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
512 513
  }

wafwerar's avatar
wafwerar 已提交
514 515 516 517
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
518 519
  }

wafwerar's avatar
wafwerar 已提交
520 521 522 523
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
524 525 526 527 528 529 530 531 532
  }

  if (clientIp != 0) {
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
    clientAddr.sin_addr.s_addr = clientIp;
    clientAddr.sin_port = 0;

    /* bind socket to client address */
wafwerar's avatar
wafwerar 已提交
533 534 535 536 537
    if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
      // printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
      //        strerror(errno));
      taosCloseSocket(&pSocket);
      return NULL;
538 539 540 541 542 543 544 545
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.s_addr = destIp;
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

wafwerar's avatar
wafwerar 已提交
546 547 548
#ifdef _TD_LINUX
  taosSetNonblocking(pSocket, 1);
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
549 550
  if (ret == -1) {
    if (errno == EHOSTUNREACH) {
wafwerar's avatar
wafwerar 已提交
551 552 553
      // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
      taosCloseSocket(&pSocket);
      return -1;
554
    } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
wafwerar's avatar
wafwerar 已提交
555
      struct pollfd wfd[1];
556

wafwerar's avatar
wafwerar 已提交
557
      wfd[0].fd = pSocket->fd;
558
      wfd[0].events = POLLOUT;
wafwerar's avatar
wafwerar 已提交
559

560 561
      int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
      if (res == -1 || res == 0) {
wafwerar's avatar
wafwerar 已提交
562 563
        // printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
564 565
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
566 567 568 569
      int optVal = -1, optLen = sizeof(int);
      if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
        // printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
570 571 572
        return -1;
      }
      ret = 0;
wafwerar's avatar
wafwerar 已提交
573 574 575 576 577
    } else {  // Other error
      // printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
      taosCloseSocket(&pSocket);  //
      return -1;
    }
578
  }
wafwerar's avatar
wafwerar 已提交
579
  taosSetNonblocking(pSocket, 0);
580 581

#else
wafwerar's avatar
wafwerar 已提交
582
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
583 584 585
#endif

  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
586 587 588
    // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
589
  } else {
wafwerar's avatar
wafwerar 已提交
590
    taosKeepTcpAlive(pSocket);
591 592
  }

wafwerar's avatar
wafwerar 已提交
593
  return pSocket;
594 595
}

wafwerar's avatar
wafwerar 已提交
596 597 598 599
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
600
  int32_t alive = 1;
wafwerar's avatar
wafwerar 已提交
601 602 603
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
604 605 606 607 608 609
    return -1;
  }

#ifndef __APPLE__
  // all fails on macosx
  int32_t probes = 3;
wafwerar's avatar
wafwerar 已提交
610 611 612
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
613 614 615 616
    return -1;
  }

  int32_t alivetime = 10;
wafwerar's avatar
wafwerar 已提交
617 618 619
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
620 621 622 623
    return -1;
  }

  int32_t interval = 3;
wafwerar's avatar
wafwerar 已提交
624 625 626
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
627 628
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
629
#endif  // __APPLE__
630 631

  int32_t nodelay = 1;
wafwerar's avatar
wafwerar 已提交
632 633 634
  if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
    // printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
635 636 637 638 639 640
    return -1;
  }

  struct linger linger = {0};
  linger.l_onoff = 1;
  linger.l_linger = 3;
wafwerar's avatar
wafwerar 已提交
641 642 643
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
    // printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
644 645 646 647 648 649
    return -1;
  }

  return 0;
}

wafwerar's avatar
wafwerar 已提交
650
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
651
  struct sockaddr_in serverAdd;
wafwerar's avatar
wafwerar 已提交
652
  SocketFd           fd;
653 654
  int32_t            reuse;

wafwerar's avatar
wafwerar 已提交
655
  // printf("open tcp server socket:0x%x:%hu", ip, port);
656 657 658 659 660 661

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
662 663 664 665 666 667 668 669 670 671
  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }

  TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
672
  }
wafwerar's avatar
wafwerar 已提交
673 674
  pSocket->refId = 0;
  pSocket->fd = fd;
675 676 677

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
wafwerar's avatar
wafwerar 已提交
678 679 680 681
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
682 683 684
  }

  /* bind socket to server address */
wafwerar's avatar
wafwerar 已提交
685 686 687 688
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
689 690
  }

wafwerar's avatar
wafwerar 已提交
691 692 693 694
  if (taosKeepTcpAlive(pSocket) < 0) {
    // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
695 696
  }

wafwerar's avatar
wafwerar 已提交
697 698 699 700
  if (listen(pSocket->fd, 1024) < 0) {
    // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
701 702
  }

wafwerar's avatar
wafwerar 已提交
703
  return (TdSocketServerPtr)pSocket;
704 705
}

wafwerar's avatar
wafwerar 已提交
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr,
                                       socklen_t *addrLen) {
  if (pServerSocket == NULL || pServerSocket->fd < 0) {
    return NULL;
  }
  SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
  if (fd == -1) {
    // tError("TCP accept failure(%s)", strerror(errno));
    return NULL;
  }

  TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;
  return pSocket;
}
726 727 728
#define COPY_SIZE 32768
// sendfile shall be used

wafwerar's avatar
wafwerar 已提交
729 730 731 732
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
  if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
    return -1;
  }
733 734 735 736 737 738 739 740 741 742 743 744
  int64_t leftLen;
  int64_t readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = leftLen;
    else
      readLen = COPY_SIZE;  // 4K

wafwerar's avatar
wafwerar 已提交
745
    int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
746
    if (readLen != retLen) {
wafwerar's avatar
wafwerar 已提交
747 748
      // printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, retLen, len, leftLen, strerror(errno));
749 750 751
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
752
    writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
753 754

    if (readLen != writeLen) {
wafwerar's avatar
wafwerar 已提交
755 756
      // printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, writeLen, len, leftLen, strerror(errno));
757 758 759 760 761 762 763 764
      return -1;
    }

    leftLen -= readLen;
  }

  return len;
}
765 766

void taosBlockSIGPIPE() {
767 768
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
769 770 771 772 773
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
  int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
774
    // printf("failed to block SIGPIPE");
775
  }
776
#endif
777
}
778 779 780 781 782 783 784 785 786 787

uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
  struct addrinfo hints = {0};
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  struct addrinfo *result = NULL;

  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
  if (result) {
wafwerar's avatar
wafwerar 已提交
788
    struct sockaddr    *sa = result->ai_addr;
789 790 791 792 793 794 795 796
    struct sockaddr_in *si = (struct sockaddr_in *)sa;
    struct in_addr      ia = si->sin_addr;
    uint32_t            ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
#ifdef EAI_SYSTEM
    if (ret == EAI_SYSTEM) {
wafwerar's avatar
wafwerar 已提交
797
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
798
    } else {
wafwerar's avatar
wafwerar 已提交
799
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
800
    }
801
#else
wafwerar's avatar
wafwerar 已提交
802
    // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
803 804 805 806 807 808 809 810 811
#endif
    return 0xFFFFFFFF;
  }
}

int32_t taosGetFqdn(char *fqdn) {
  char hostname[1024];
  hostname[1023] = '\0';
  if (gethostname(hostname, 1023) == -1) {
wafwerar's avatar
wafwerar 已提交
812
    // printf("failed to get hostname, reason:%s", strerror(errno));
813 814 815 816 817 818 819 820 821 822 823
    return -1;
  }

  struct addrinfo  hints = {0};
  struct addrinfo *result = NULL;
#ifdef __APPLE__
  // on macosx, hostname -f has the form of xxx.local
  // which will block getaddrinfo for a few seconds if AI_CANONNAME is set
  // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
  // immediately
  hints.ai_family = AF_INET;
wafwerar's avatar
wafwerar 已提交
824
#else   // __APPLE__
825
  hints.ai_flags = AI_CANONNAME;
wafwerar's avatar
wafwerar 已提交
826
#endif  // __APPLE__
827 828
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
  if (!result) {
wafwerar's avatar
wafwerar 已提交
829
    // printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
830 831 832 833 834 835
    return -1;
  }

#ifdef __APPLE__
  // refer to comments above
  strcpy(fqdn, hostname);
wafwerar's avatar
wafwerar 已提交
836
#else   // __APPLE__
837
  strcpy(fqdn, result->ai_canonname);
wafwerar's avatar
wafwerar 已提交
838
#endif  // __APPLE__
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
  freeaddrinfo(result);
  return 0;
}

// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
  char ip_addr_cpy[20];
  char ip[5];

  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int32_t k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((uint32_t *)ip);
}

void tinet_ntoa(char *ipstr, uint32_t ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

void taosIgnSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
  signal(SIGPIPE, SIG_IGN);
#endif
}

void taosSetMaskSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
  int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
    // printf("failed to setmask SIGPIPE");
  }
#endif
}

int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, socklen_t *addrLen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  return getsockname(pSocket->fd, destAddr, addrLen);
}


TdEpollPtr taosCreateEpoll(int32_t size) {
  EpollFd fd = -1;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
  fd = epoll_create(size);
#endif
  if (fd < 0) {
    return NULL;
  }

  TdEpollPtr pEpoll = (TdEpollPtr)malloc(sizeof(TdEpoll));
  if (pEpoll == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pEpoll->fd = fd;
  pEpoll->refId = 0;
  return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
  }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
  code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
  return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
937
  }
wafwerar's avatar
wafwerar 已提交
938 939 940
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
  code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
941
#endif
wafwerar's avatar
wafwerar 已提交
942 943 944 945 946 947 948 949 950 951 952 953
  return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
  int32_t code;
  if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
  (*ppEpoll)->fd = -1;
  free(*ppEpoll);
  return code;
}