osSocket.c 24.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
S
Shengliang Guan 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19

20
#if defined(WINDOWS)
wafwerar's avatar
wafwerar 已提交
21 22 23 24 25 26 27
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <Winsock2.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winbase.h>
S
Shengliang Guan 已提交
28
#else
wafwerar's avatar
wafwerar 已提交
29 30 31 32 33 34 35 36 37
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <unistd.h>
38 39 40 41 42 43 44

#if defined(DARWIN)
  #include <dispatch/dispatch.h>
  #include "osEok.h"
#else
  #include <sys/epoll.h>
#endif
S
Shengliang Guan 已提交
45 46
#endif

wafwerar's avatar
wafwerar 已提交
47 48
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
49
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
50 51 52 53
#endif
  int      refId;
  SocketFd fd;
} *TdSocketServerPtr, TdSocketServer;
S
Shengliang Guan 已提交
54

wafwerar's avatar
wafwerar 已提交
55 56
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
57
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
58 59 60 61
#endif
  int      refId;
  EpollFd  fd;
} *TdEpollPtr, TdEpoll;
S
Shengliang Guan 已提交
62

wafwerar's avatar
wafwerar 已提交
63 64 65 66
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
                    int addrlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
67
  }
wafwerar's avatar
wafwerar 已提交
68
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
69 70 71 72 73 74 75 76
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#else
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#endif
}
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
77
  }
wafwerar's avatar
wafwerar 已提交
78
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
79 80 81 82 83 84 85 86 87
  return send(pSocket->fd, buf, len, 0);
  ;
#else
  return write(pSocket->fd, buf, len);
#endif
}
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
88
  }
wafwerar's avatar
wafwerar 已提交
89
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
90 91 92 93 94 95
  return recv(pSocket->fd, buf, len, 0);
  ;
#else
  return read(pSocket->fd, buf, len);
#endif
}
S
Shengliang Guan 已提交
96

wafwerar's avatar
wafwerar 已提交
97
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
98 99
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
100
  }
wafwerar's avatar
wafwerar 已提交
101 102 103
  return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
}
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
wafwerar's avatar
wafwerar 已提交
104
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
105 106 107
  return closesocket(fd);
#else
  return close(fd);
S
Shengliang Guan 已提交
108
#endif
wafwerar's avatar
wafwerar 已提交
109 110 111 112 113 114 115 116
}
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
  int32_t code;
  if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocket)->fd);
  (*ppSocket)->fd = -1;
wafwerar's avatar
wafwerar 已提交
117
  taosMemoryFree(*ppSocket);
wafwerar's avatar
wafwerar 已提交
118 119 120 121 122 123 124 125 126
  return code;
}
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
  int32_t code;
  if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
  (*ppSocketServer)->fd = -1;
wafwerar's avatar
wafwerar 已提交
127
  taosMemoryFree(*ppSocketServer);
wafwerar's avatar
wafwerar 已提交
128 129
  return code;
}
S
Shengliang Guan 已提交
130

wafwerar's avatar
wafwerar 已提交
131 132 133 134
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
135
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
136
  return closesocket(pSocket->fd);
S
Shengliang Guan 已提交
137
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
138
  return close(pSocket->fd);
S
Shengliang Guan 已提交
139
#else
wafwerar's avatar
wafwerar 已提交
140
  return shutdown(pSocket->fd, SHUT_RD);
S
Shengliang Guan 已提交
141 142
#endif
}
wafwerar's avatar
wafwerar 已提交
143 144 145 146
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
147
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
148
  return closesocket(pSocketServer->fd);
S
Shengliang Guan 已提交
149
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
150
  return close(pSocketServer->fd);
S
Shengliang Guan 已提交
151
#else
wafwerar's avatar
wafwerar 已提交
152
  return shutdown(pSocketServer->fd, SHUT_RD);
S
Shengliang Guan 已提交
153 154
#endif
}
S
Shengliang Guan 已提交
155

wafwerar's avatar
wafwerar 已提交
156 157 158
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
159
  }
wafwerar's avatar
wafwerar 已提交
160 161 162 163 164 165
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_WR);
S
slguan 已提交
166
#endif
S
Shengliang Guan 已提交
167
}
wafwerar's avatar
wafwerar 已提交
168 169 170 171 172 173 174 175 176 177
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_WR);
S
Shengliang Guan 已提交
178
#endif
wafwerar's avatar
wafwerar 已提交
179 180 181 182
}
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
183
  }
wafwerar's avatar
wafwerar 已提交
184 185 186 187 188 189 190 191 192 193 194
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_RDWR);
#endif
}
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
195
  }
wafwerar's avatar
wafwerar 已提交
196 197 198 199 200 201 202
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_RDWR);
#endif
203
}
wafwerar's avatar
wafwerar 已提交
204

wafwerar's avatar
wafwerar 已提交
205 206
void taosWinSocketInit() {
#ifdef WINDOWS
S
Shengliang Guan 已提交
207 208 209 210 211 212 213 214 215
  static char flag = 0;
  if (flag == 0) {
    WORD    wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(1, 1);
    if (WSAStartup(wVersionRequested, &wsaData) == 0) {
      flag = 1;
    }
  }
wafwerar's avatar
wafwerar 已提交
216 217
#else
#endif
218
}
wafwerar's avatar
wafwerar 已提交
219 220 221 222
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
223
#ifdef WINDOWS
S
Shengliang Guan 已提交
224 225 226
  u_long mode;
  if (on) {
    mode = 1;
wafwerar's avatar
wafwerar 已提交
227
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
S
Shengliang Guan 已提交
228 229
  } else {
    mode = 0;
wafwerar's avatar
wafwerar 已提交
230 231 232 233 234 235 236
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
  }
#else
  int32_t flags = 0;
  if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
    // printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
S
Shengliang Guan 已提交
237
  }
wafwerar's avatar
wafwerar 已提交
238 239 240 241 242 243 244 245 246 247 248

  if (on)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
    // printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
  }
#endif
S
Shengliang Guan 已提交
249 250
  return 0;
}
wafwerar's avatar
wafwerar 已提交
251 252 253 254
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
255
#ifdef WINDOWS
S
Shengliang Guan 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
  if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPCNT) {
    return 0;
  }

wafwerar's avatar
wafwerar 已提交
272 273
  return setsockopt(pSocket->fd, level, optname, optval, optlen);
#else
wafwerar's avatar
wafwerar 已提交
274
  return setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
wafwerar's avatar
wafwerar 已提交
275 276 277 278 279 280
#endif
}
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
281
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
282 283
  return 0;
#else
wafwerar's avatar
wafwerar 已提交
284
  return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
wafwerar's avatar
wafwerar 已提交
285
#endif
S
Shengliang Guan 已提交
286
}
S
config  
Shengliang Guan 已提交
287
uint32_t taosInetAddr(const char *ipAddr) {
wafwerar's avatar
wafwerar 已提交
288
#ifdef WINDOWS
S
Shengliang Guan 已提交
289 290 291 292 293 294 295
  uint32_t value;
  int32_t  ret = inet_pton(AF_INET, ipAddr, &value);
  if (ret <= 0) {
    return INADDR_NONE;
  } else {
    return value;
  }
wafwerar's avatar
wafwerar 已提交
296 297 298
#else
  return inet_addr(ipAddr);
#endif
S
Shengliang Guan 已提交
299
}
S
Shengliang Guan 已提交
300
const char *taosInetNtoa(struct in_addr ipInt) {
wafwerar's avatar
wafwerar 已提交
301
#ifdef WINDOWS
S
Shengliang Guan 已提交
302 303 304
  // not thread safe, only for debug usage while print log
  static char tmpDstStr[16];
  return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
wafwerar's avatar
wafwerar 已提交
305 306
#else
  return inet_ntoa(ipInt);
307
#endif
wafwerar's avatar
wafwerar 已提交
308
}
309 310

#ifndef SIGPIPE
wafwerar's avatar
wafwerar 已提交
311
#define SIGPIPE EPIPE
312 313 314 315
#endif

#define TCP_CONN_TIMEOUT 3000  // conn timeout

wafwerar's avatar
wafwerar 已提交
316 317 318 319
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
320
  int32_t nleft, nwritten;
wafwerar's avatar
wafwerar 已提交
321
  char   *ptr = (char *)buf;
322 323 324 325

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
326
    nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
    if (nwritten <= 0) {
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
345 346 347 348
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
349
  int32_t nleft, nread;
wafwerar's avatar
wafwerar 已提交
350
  char   *ptr = (char *)buf;
351 352 353 354

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
355
    nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
356 357 358
    if (nread == 0) {
      break;
    } else if (nread < 0) {
wafwerar's avatar
wafwerar 已提交
359
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
377 378 379 380 381
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  taosSetNonblocking(pSocket, 1);
382

wafwerar's avatar
wafwerar 已提交
383 384
  int32_t        nleft, nwritten, nready;
  fd_set         fset;
385 386 387 388 389 390 391
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
wafwerar's avatar
wafwerar 已提交
392 393
    FD_SET(pSocket->fd, &fset);
    if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
394
      errno = ETIMEDOUT;
wafwerar's avatar
wafwerar 已提交
395
      // printf("fd %d timeout, no enough space to write", fd);
396 397 398 399 400
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
401
      // printf("select error, %d (%s)", errno, strerror(errno));
402 403 404
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
405
    nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
406 407 408
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
409
      // printf("write error, %d (%s)", errno, strerror(errno));
410 411 412 413 414 415 416
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

wafwerar's avatar
wafwerar 已提交
417
  taosSetNonblocking(pSocket, 0);
418 419 420 421

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
422
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
423
  struct sockaddr_in localAddr;
wafwerar's avatar
wafwerar 已提交
424 425
  SocketFd           fd;
  int32_t            bufSize = 1024000;
426

wafwerar's avatar
wafwerar 已提交
427
  // printf("open udp socket:0x%x:%hu", ip, port);
428 429 430 431 432 433

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
  localAddr.sin_addr.s_addr = ip;
  localAddr.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
434 435 436 437
  if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
    // printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
438 439
  }

wafwerar's avatar
wafwerar 已提交
440
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
441 442 443
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
444
  }
wafwerar's avatar
wafwerar 已提交
445 446
  pSocket->fd = fd;
  pSocket->refId = 0;
447

wafwerar's avatar
wafwerar 已提交
448 449 450 451 452 453 454 455 456 457
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
  }

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
458 459 460
  }

  /* bind socket to local address */
wafwerar's avatar
wafwerar 已提交
461 462 463 464
  if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
    // printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
    taosCloseSocket(&pSocket);
    return NULL;
465 466
  }

wafwerar's avatar
wafwerar 已提交
467
  return pSocket;
468 469
}

wafwerar's avatar
wafwerar 已提交
470 471 472
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
  SocketFd           fd = -1;
  int32_t            ret;
473
  struct sockaddr_in serverAddr, clientAddr;
wafwerar's avatar
wafwerar 已提交
474
  int32_t            bufSize = 1024 * 1024;
475

wafwerar's avatar
wafwerar 已提交
476
  fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
477

wafwerar's avatar
wafwerar 已提交
478 479 480 481
  if (fd <= 2) {
    // printf("failed to open the socket: %d (%s)", errno, strerror(errno));
    if (fd >= 0) taosCloseSocketNoCheck1(fd);
    return NULL;
482 483
  }

wafwerar's avatar
wafwerar 已提交
484
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
485 486 487 488 489 490 491
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;

492 493
  /* set REUSEADDR option, so the portnumber can be re-used */
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
494 495 496 497
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
498 499
  }

wafwerar's avatar
wafwerar 已提交
500 501 502 503
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
504 505
  }

wafwerar's avatar
wafwerar 已提交
506 507 508 509
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
510 511 512 513 514 515 516 517 518
  }

  if (clientIp != 0) {
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
    clientAddr.sin_addr.s_addr = clientIp;
    clientAddr.sin_port = 0;

    /* bind socket to client address */
wafwerar's avatar
wafwerar 已提交
519 520 521 522 523
    if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
      // printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
      //        strerror(errno));
      taosCloseSocket(&pSocket);
      return NULL;
524 525 526 527 528 529 530 531
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.s_addr = destIp;
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

wafwerar's avatar
wafwerar 已提交
532 533 534
#ifdef _TD_LINUX
  taosSetNonblocking(pSocket, 1);
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
535 536
  if (ret == -1) {
    if (errno == EHOSTUNREACH) {
wafwerar's avatar
wafwerar 已提交
537 538 539
      // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
      taosCloseSocket(&pSocket);
      return -1;
540
    } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
wafwerar's avatar
wafwerar 已提交
541
      struct pollfd wfd[1];
542

wafwerar's avatar
wafwerar 已提交
543
      wfd[0].fd = pSocket->fd;
544
      wfd[0].events = POLLOUT;
wafwerar's avatar
wafwerar 已提交
545

546 547
      int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
      if (res == -1 || res == 0) {
wafwerar's avatar
wafwerar 已提交
548 549
        // printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
550 551
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
552 553 554 555
      int optVal = -1, optLen = sizeof(int);
      if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
        // printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
556 557 558
        return -1;
      }
      ret = 0;
wafwerar's avatar
wafwerar 已提交
559 560 561 562 563
    } else {  // Other error
      // printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
      taosCloseSocket(&pSocket);  //
      return -1;
    }
564
  }
wafwerar's avatar
wafwerar 已提交
565
  taosSetNonblocking(pSocket, 0);
566 567

#else
wafwerar's avatar
wafwerar 已提交
568
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
569 570 571
#endif

  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
572 573 574
    // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
575
  } else {
wafwerar's avatar
wafwerar 已提交
576
    taosKeepTcpAlive(pSocket);
577 578
  }

wafwerar's avatar
wafwerar 已提交
579
  return pSocket;
580 581
}

wafwerar's avatar
wafwerar 已提交
582 583 584 585
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
586
  int32_t alive = 1;
wafwerar's avatar
wafwerar 已提交
587 588 589
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
590 591 592 593 594 595
    return -1;
  }

#ifndef __APPLE__
  // all fails on macosx
  int32_t probes = 3;
wafwerar's avatar
wafwerar 已提交
596 597 598
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
599 600 601 602
    return -1;
  }

  int32_t alivetime = 10;
wafwerar's avatar
wafwerar 已提交
603 604 605
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
606 607 608 609
    return -1;
  }

  int32_t interval = 3;
wafwerar's avatar
wafwerar 已提交
610 611 612
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
613 614
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
615
#endif  // __APPLE__
616 617

  int32_t nodelay = 1;
wafwerar's avatar
wafwerar 已提交
618 619 620
  if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
    // printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
621 622 623 624 625 626
    return -1;
  }

  struct linger linger = {0};
  linger.l_onoff = 1;
  linger.l_linger = 3;
wafwerar's avatar
wafwerar 已提交
627 628 629
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
    // printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
630 631 632 633 634 635
    return -1;
  }

  return 0;
}

wafwerar's avatar
wafwerar 已提交
636
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
637
  struct sockaddr_in serverAdd;
wafwerar's avatar
wafwerar 已提交
638
  SocketFd           fd;
639 640
  int32_t            reuse;

wafwerar's avatar
wafwerar 已提交
641
  // printf("open tcp server socket:0x%x:%hu", ip, port);
642 643 644 645 646 647

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
648 649 650 651 652 653
  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
654
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
655 656 657
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
658
  }
wafwerar's avatar
wafwerar 已提交
659 660
  pSocket->refId = 0;
  pSocket->fd = fd;
661 662 663

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
wafwerar's avatar
wafwerar 已提交
664 665 666 667
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
668 669 670
  }

  /* bind socket to server address */
wafwerar's avatar
wafwerar 已提交
671 672 673 674
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
675 676
  }

wafwerar's avatar
wafwerar 已提交
677 678 679 680
  if (taosKeepTcpAlive(pSocket) < 0) {
    // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
681 682
  }

wafwerar's avatar
wafwerar 已提交
683 684 685 686
  if (listen(pSocket->fd, 1024) < 0) {
    // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
687 688
  }

wafwerar's avatar
wafwerar 已提交
689
  return (TdSocketServerPtr)pSocket;
690 691
}

wafwerar's avatar
wafwerar 已提交
692
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr,
wafwerar's avatar
wafwerar 已提交
693
                                       int *addrLen) {
wafwerar's avatar
wafwerar 已提交
694 695 696 697 698 699 700 701 702
  if (pServerSocket == NULL || pServerSocket->fd < 0) {
    return NULL;
  }
  SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
  if (fd == -1) {
    // tError("TCP accept failure(%s)", strerror(errno));
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
703
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
704 705 706 707 708 709 710 711
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;
  return pSocket;
}
712 713 714
#define COPY_SIZE 32768
// sendfile shall be used

wafwerar's avatar
wafwerar 已提交
715 716 717 718
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
  if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
    return -1;
  }
719 720 721 722 723 724 725 726 727 728 729 730
  int64_t leftLen;
  int64_t readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = leftLen;
    else
      readLen = COPY_SIZE;  // 4K

wafwerar's avatar
wafwerar 已提交
731
    int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
732
    if (readLen != retLen) {
wafwerar's avatar
wafwerar 已提交
733 734
      // printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, retLen, len, leftLen, strerror(errno));
735 736 737
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
738
    writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
739 740

    if (readLen != writeLen) {
wafwerar's avatar
wafwerar 已提交
741 742
      // printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, writeLen, len, leftLen, strerror(errno));
743 744 745 746 747 748 749 750
      return -1;
    }

    leftLen -= readLen;
  }

  return len;
}
751 752

void taosBlockSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
753
#ifdef WINDOWS
754
#else
755 756 757
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
758
  int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
759
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
760
    // printf("failed to block SIGPIPE");
761
  }
762
#endif
763
}
764 765 766 767 768 769 770 771 772 773

uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
  struct addrinfo hints = {0};
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  struct addrinfo *result = NULL;

  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
  if (result) {
wafwerar's avatar
wafwerar 已提交
774
    struct sockaddr    *sa = result->ai_addr;
775 776 777 778 779 780 781 782
    struct sockaddr_in *si = (struct sockaddr_in *)sa;
    struct in_addr      ia = si->sin_addr;
    uint32_t            ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
#ifdef EAI_SYSTEM
    if (ret == EAI_SYSTEM) {
wafwerar's avatar
wafwerar 已提交
783
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
784
    } else {
wafwerar's avatar
wafwerar 已提交
785
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
786
    }
787
#else
wafwerar's avatar
wafwerar 已提交
788
    // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
789 790 791 792 793 794 795 796 797
#endif
    return 0xFFFFFFFF;
  }
}

int32_t taosGetFqdn(char *fqdn) {
  char hostname[1024];
  hostname[1023] = '\0';
  if (gethostname(hostname, 1023) == -1) {
wafwerar's avatar
wafwerar 已提交
798
    printf("failed to get hostname, reason:%s", strerror(errno));
wafwerar's avatar
wafwerar 已提交
799
    assert(0);
800 801 802 803 804 805 806 807 808 809 810
    return -1;
  }

  struct addrinfo  hints = {0};
  struct addrinfo *result = NULL;
#ifdef __APPLE__
  // on macosx, hostname -f has the form of xxx.local
  // which will block getaddrinfo for a few seconds if AI_CANONNAME is set
  // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
  // immediately
  hints.ai_family = AF_INET;
wafwerar's avatar
wafwerar 已提交
811
#else   // __APPLE__
812
  hints.ai_flags = AI_CANONNAME;
wafwerar's avatar
wafwerar 已提交
813
#endif  // __APPLE__
814 815
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
  if (!result) {
wafwerar's avatar
wafwerar 已提交
816
    printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
wafwerar's avatar
wafwerar 已提交
817
    assert(0);
818 819 820 821 822 823
    return -1;
  }

#ifdef __APPLE__
  // refer to comments above
  strcpy(fqdn, hostname);
wafwerar's avatar
wafwerar 已提交
824
#else   // __APPLE__
825
  strcpy(fqdn, result->ai_canonname);
wafwerar's avatar
wafwerar 已提交
826
#endif  // __APPLE__
827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863
  freeaddrinfo(result);
  return 0;
}

// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
  char ip_addr_cpy[20];
  char ip[5];

  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int32_t k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((uint32_t *)ip);
}

void tinet_ntoa(char *ipstr, uint32_t ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

void taosIgnSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
864
#ifdef WINDOWS
865 866 867 868 869 870
#else
  signal(SIGPIPE, SIG_IGN);
#endif
}

void taosSetMaskSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
871
#ifdef WINDOWS
872 873 874 875
#else
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
876
  int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
877
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
878 879 880 881 882
    // printf("failed to setmask SIGPIPE");
  }
#endif
}

wafwerar's avatar
wafwerar 已提交
883
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
884 885 886 887 888 889 890 891 892
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  return getsockname(pSocket->fd, destAddr, addrLen);
}


TdEpollPtr taosCreateEpoll(int32_t size) {
  EpollFd fd = -1;
wafwerar's avatar
wafwerar 已提交
893
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
894 895 896 897 898 899 900
#else
  fd = epoll_create(size);
#endif
  if (fd < 0) {
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
901
  TdEpollPtr pEpoll = (TdEpollPtr)taosMemoryMalloc(sizeof(TdEpoll));
wafwerar's avatar
wafwerar 已提交
902 903 904 905 906 907 908 909 910 911 912 913 914
  if (pEpoll == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pEpoll->fd = fd;
  pEpoll->refId = 0;
  return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
915
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
916 917 918 919 920 921 922 923 924
#else
  code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
  return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
925
  }
wafwerar's avatar
wafwerar 已提交
926
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
927 928
#else
  code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
929
#endif
wafwerar's avatar
wafwerar 已提交
930 931 932 933 934 935 936 937 938
  return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
  int32_t code;
  if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
  (*ppEpoll)->fd = -1;
wafwerar's avatar
wafwerar 已提交
939
  taosMemoryFree(*ppEpoll);
wafwerar's avatar
wafwerar 已提交
940 941
  return code;
}