osSocket.c 28.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
S
Shengliang Guan 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19

20
#if defined(WINDOWS)
wafwerar's avatar
wafwerar 已提交
21 22 23 24 25 26 27
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <Winsock2.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winbase.h>
S
Shengliang Guan 已提交
28
#else
wafwerar's avatar
wafwerar 已提交
29 30
#include <arpa/inet.h>
#include <fcntl.h>
dengyihao's avatar
dengyihao 已提交
31
#include <net/if.h>
wafwerar's avatar
wafwerar 已提交
32 33 34 35 36 37 38
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <unistd.h>
39 40

#if defined(DARWIN)
dengyihao's avatar
dengyihao 已提交
41 42
#include <dispatch/dispatch.h>
#include "osEok.h"
43
#else
dengyihao's avatar
dengyihao 已提交
44
#include <sys/epoll.h>
45
#endif
S
Shengliang Guan 已提交
46 47
#endif

dengyihao's avatar
dengyihao 已提交
48 49 50 51
#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif

wafwerar's avatar
wafwerar 已提交
52 53
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
54
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
55 56 57
#endif
  int      refId;
  SocketFd fd;
dengyihao's avatar
dengyihao 已提交
58
} * TdSocketServerPtr, TdSocketServer;
S
Shengliang Guan 已提交
59

wafwerar's avatar
wafwerar 已提交
60 61
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
62
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
63
#endif
dengyihao's avatar
dengyihao 已提交
64 65 66
  int     refId;
  EpollFd fd;
} * TdEpollPtr, TdEpoll;
S
Shengliang Guan 已提交
67

wafwerar's avatar
wafwerar 已提交
68
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
dengyihao's avatar
dengyihao 已提交
69
                   int addrlen) {
wafwerar's avatar
wafwerar 已提交
70 71
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
72
  }
wafwerar's avatar
wafwerar 已提交
73
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
74 75 76 77 78 79 80 81
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#else
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#endif
}
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
82
  }
wafwerar's avatar
wafwerar 已提交
83
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
84 85 86 87 88 89 90 91 92
  return send(pSocket->fd, buf, len, 0);
  ;
#else
  return write(pSocket->fd, buf, len);
#endif
}
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
93
  }
wafwerar's avatar
wafwerar 已提交
94
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
95 96 97 98 99 100
  return recv(pSocket->fd, buf, len, 0);
  ;
#else
  return read(pSocket->fd, buf, len);
#endif
}
S
Shengliang Guan 已提交
101

dengyihao's avatar
dengyihao 已提交
102 103
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
                           int *addrLen) {
wafwerar's avatar
wafwerar 已提交
104 105
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
106
  }
wafwerar's avatar
wafwerar 已提交
107 108 109
  return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
}
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
wafwerar's avatar
wafwerar 已提交
110
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
111 112 113
  return closesocket(fd);
#else
  return close(fd);
S
Shengliang Guan 已提交
114
#endif
wafwerar's avatar
wafwerar 已提交
115 116 117 118 119 120 121 122
}
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
  int32_t code;
  if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocket)->fd);
  (*ppSocket)->fd = -1;
wafwerar's avatar
wafwerar 已提交
123
  taosMemoryFree(*ppSocket);
wafwerar's avatar
wafwerar 已提交
124 125 126 127 128 129 130 131 132
  return code;
}
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
  int32_t code;
  if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
  (*ppSocketServer)->fd = -1;
wafwerar's avatar
wafwerar 已提交
133
  taosMemoryFree(*ppSocketServer);
wafwerar's avatar
wafwerar 已提交
134 135
  return code;
}
S
Shengliang Guan 已提交
136

wafwerar's avatar
wafwerar 已提交
137 138 139 140
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
141
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
142
  return closesocket(pSocket->fd);
S
Shengliang Guan 已提交
143
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
144
  return close(pSocket->fd);
S
Shengliang Guan 已提交
145
#else
wafwerar's avatar
wafwerar 已提交
146
  return shutdown(pSocket->fd, SHUT_RD);
S
Shengliang Guan 已提交
147 148
#endif
}
wafwerar's avatar
wafwerar 已提交
149 150 151 152
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
153
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
154
  return closesocket(pSocketServer->fd);
S
Shengliang Guan 已提交
155
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
156
  return close(pSocketServer->fd);
S
Shengliang Guan 已提交
157
#else
wafwerar's avatar
wafwerar 已提交
158
  return shutdown(pSocketServer->fd, SHUT_RD);
S
Shengliang Guan 已提交
159 160
#endif
}
S
Shengliang Guan 已提交
161

wafwerar's avatar
wafwerar 已提交
162 163 164
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
165
  }
wafwerar's avatar
wafwerar 已提交
166 167 168 169 170 171
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_WR);
S
slguan 已提交
172
#endif
S
Shengliang Guan 已提交
173
}
wafwerar's avatar
wafwerar 已提交
174 175 176 177 178 179 180 181 182 183
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_WR);
S
Shengliang Guan 已提交
184
#endif
wafwerar's avatar
wafwerar 已提交
185 186 187 188
}
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
189
  }
wafwerar's avatar
wafwerar 已提交
190 191 192 193 194 195 196 197 198 199 200
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_RDWR);
#endif
}
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
201
  }
wafwerar's avatar
wafwerar 已提交
202 203 204 205 206 207 208
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_RDWR);
#endif
209
}
wafwerar's avatar
wafwerar 已提交
210

wafwerar's avatar
wafwerar 已提交
211 212
void taosWinSocketInit() {
#ifdef WINDOWS
S
Shengliang Guan 已提交
213 214 215 216 217 218 219 220 221
  static char flag = 0;
  if (flag == 0) {
    WORD    wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(1, 1);
    if (WSAStartup(wVersionRequested, &wsaData) == 0) {
      flag = 1;
    }
  }
wafwerar's avatar
wafwerar 已提交
222 223
#else
#endif
224
}
wafwerar's avatar
wafwerar 已提交
225 226 227 228
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
229
#ifdef WINDOWS
S
Shengliang Guan 已提交
230 231 232
  u_long mode;
  if (on) {
    mode = 1;
wafwerar's avatar
wafwerar 已提交
233
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
S
Shengliang Guan 已提交
234 235
  } else {
    mode = 0;
wafwerar's avatar
wafwerar 已提交
236 237 238 239 240 241 242
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
  }
#else
  int32_t flags = 0;
  if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
    // printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
S
Shengliang Guan 已提交
243
  }
wafwerar's avatar
wafwerar 已提交
244 245 246 247 248 249 250 251 252 253 254

  if (on)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
    // printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
  }
#endif
S
Shengliang Guan 已提交
255 256
  return 0;
}
wafwerar's avatar
wafwerar 已提交
257 258 259 260
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
261
#ifdef WINDOWS
S
Shengliang Guan 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPCNT) {
    return 0;
  }

wafwerar's avatar
wafwerar 已提交
278 279
  return setsockopt(pSocket->fd, level, optname, optval, optlen);
#else
wafwerar's avatar
wafwerar 已提交
280
  return setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
wafwerar's avatar
wafwerar 已提交
281 282 283 284 285 286
#endif
}
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
287
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
288 289
  return 0;
#else
wafwerar's avatar
wafwerar 已提交
290
  return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
wafwerar's avatar
wafwerar 已提交
291
#endif
S
Shengliang Guan 已提交
292
}
S
config  
Shengliang Guan 已提交
293
uint32_t taosInetAddr(const char *ipAddr) {
wafwerar's avatar
wafwerar 已提交
294
#ifdef WINDOWS
S
Shengliang Guan 已提交
295 296 297 298 299 300 301
  uint32_t value;
  int32_t  ret = inet_pton(AF_INET, ipAddr, &value);
  if (ret <= 0) {
    return INADDR_NONE;
  } else {
    return value;
  }
wafwerar's avatar
wafwerar 已提交
302 303 304
#else
  return inet_addr(ipAddr);
#endif
S
Shengliang Guan 已提交
305
}
S
Shengliang Guan 已提交
306
const char *taosInetNtoa(struct in_addr ipInt) {
wafwerar's avatar
wafwerar 已提交
307
#ifdef WINDOWS
S
Shengliang Guan 已提交
308 309 310
  // not thread safe, only for debug usage while print log
  static char tmpDstStr[16];
  return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
wafwerar's avatar
wafwerar 已提交
311 312
#else
  return inet_ntoa(ipInt);
313
#endif
wafwerar's avatar
wafwerar 已提交
314
}
315 316

#ifndef SIGPIPE
wafwerar's avatar
wafwerar 已提交
317
#define SIGPIPE EPIPE
318 319 320 321
#endif

#define TCP_CONN_TIMEOUT 3000  // conn timeout

wafwerar's avatar
wafwerar 已提交
322 323 324 325
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
326
  int32_t nleft, nwritten;
dengyihao's avatar
dengyihao 已提交
327
  char *  ptr = (char *)buf;
328 329 330 331

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
332
    nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
    if (nwritten <= 0) {
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
351 352 353 354
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
355
  int32_t nleft, nread;
dengyihao's avatar
dengyihao 已提交
356
  char *  ptr = (char *)buf;
357 358 359 360

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
361
    nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
362 363 364
    if (nread == 0) {
      break;
    } else if (nread < 0) {
wafwerar's avatar
wafwerar 已提交
365
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
383 384 385 386 387
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  taosSetNonblocking(pSocket, 1);
388

wafwerar's avatar
wafwerar 已提交
389 390
  int32_t        nleft, nwritten, nready;
  fd_set         fset;
391 392 393 394 395 396 397
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
wafwerar's avatar
wafwerar 已提交
398 399
    FD_SET(pSocket->fd, &fset);
    if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
400
      errno = ETIMEDOUT;
wafwerar's avatar
wafwerar 已提交
401
      // printf("fd %d timeout, no enough space to write", fd);
402 403 404 405 406
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
407
      // printf("select error, %d (%s)", errno, strerror(errno));
408 409 410
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
411
    nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
412 413 414
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
415
      // printf("write error, %d (%s)", errno, strerror(errno));
416 417 418 419 420 421 422
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

wafwerar's avatar
wafwerar 已提交
423
  taosSetNonblocking(pSocket, 0);
424 425 426 427

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
428
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
429
  struct sockaddr_in localAddr;
wafwerar's avatar
wafwerar 已提交
430 431
  SocketFd           fd;
  int32_t            bufSize = 1024000;
432

wafwerar's avatar
wafwerar 已提交
433
  // printf("open udp socket:0x%x:%hu", ip, port);
434 435 436 437 438 439

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
  localAddr.sin_addr.s_addr = ip;
  localAddr.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
440 441 442 443
  if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
    // printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
444 445
  }

wafwerar's avatar
wafwerar 已提交
446
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
447 448 449
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
450
  }
wafwerar's avatar
wafwerar 已提交
451 452
  pSocket->fd = fd;
  pSocket->refId = 0;
453

wafwerar's avatar
wafwerar 已提交
454 455 456 457 458 459 460 461 462 463
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
  }

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
464 465 466
  }

  /* bind socket to local address */
wafwerar's avatar
wafwerar 已提交
467 468 469 470
  if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
    // printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
    taosCloseSocket(&pSocket);
    return NULL;
471 472
  }

wafwerar's avatar
wafwerar 已提交
473
  return pSocket;
474 475
}

wafwerar's avatar
wafwerar 已提交
476 477 478
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
  SocketFd           fd = -1;
  int32_t            ret;
479
  struct sockaddr_in serverAddr, clientAddr;
wafwerar's avatar
wafwerar 已提交
480
  int32_t            bufSize = 1024 * 1024;
481

wafwerar's avatar
wafwerar 已提交
482
  fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
483

wafwerar's avatar
wafwerar 已提交
484 485 486 487
  if (fd <= 2) {
    // printf("failed to open the socket: %d (%s)", errno, strerror(errno));
    if (fd >= 0) taosCloseSocketNoCheck1(fd);
    return NULL;
488 489
  }

wafwerar's avatar
wafwerar 已提交
490
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
491 492 493 494 495 496 497
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;

498 499
  /* set REUSEADDR option, so the portnumber can be re-used */
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
500 501 502 503
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
504 505
  }

wafwerar's avatar
wafwerar 已提交
506 507 508 509
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
510 511
  }

wafwerar's avatar
wafwerar 已提交
512 513 514 515
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
516 517 518 519 520 521 522 523 524
  }

  if (clientIp != 0) {
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
    clientAddr.sin_addr.s_addr = clientIp;
    clientAddr.sin_port = 0;

    /* bind socket to client address */
wafwerar's avatar
wafwerar 已提交
525 526 527 528 529
    if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
      // printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
      //        strerror(errno));
      taosCloseSocket(&pSocket);
      return NULL;
530 531 532 533 534 535 536 537
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.s_addr = destIp;
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

wafwerar's avatar
wafwerar 已提交
538 539 540
#ifdef _TD_LINUX
  taosSetNonblocking(pSocket, 1);
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
541 542
  if (ret == -1) {
    if (errno == EHOSTUNREACH) {
wafwerar's avatar
wafwerar 已提交
543 544 545
      // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
      taosCloseSocket(&pSocket);
      return -1;
546
    } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
wafwerar's avatar
wafwerar 已提交
547
      struct pollfd wfd[1];
548

wafwerar's avatar
wafwerar 已提交
549
      wfd[0].fd = pSocket->fd;
550
      wfd[0].events = POLLOUT;
wafwerar's avatar
wafwerar 已提交
551

552 553
      int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
      if (res == -1 || res == 0) {
wafwerar's avatar
wafwerar 已提交
554 555
        // printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
556 557
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
558 559 560 561
      int optVal = -1, optLen = sizeof(int);
      if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
        // printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
562 563 564
        return -1;
      }
      ret = 0;
wafwerar's avatar
wafwerar 已提交
565 566 567 568 569
    } else {  // Other error
      // printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
      taosCloseSocket(&pSocket);  //
      return -1;
    }
570
  }
wafwerar's avatar
wafwerar 已提交
571
  taosSetNonblocking(pSocket, 0);
572 573

#else
wafwerar's avatar
wafwerar 已提交
574
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
575 576 577
#endif

  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
578 579 580
    // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
581
  } else {
wafwerar's avatar
wafwerar 已提交
582
    taosKeepTcpAlive(pSocket);
583 584
  }

wafwerar's avatar
wafwerar 已提交
585
  return pSocket;
586 587
}

wafwerar's avatar
wafwerar 已提交
588 589 590 591
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
592
  int32_t alive = 1;
wafwerar's avatar
wafwerar 已提交
593 594 595
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
596 597 598 599 600 601
    return -1;
  }

#ifndef __APPLE__
  // all fails on macosx
  int32_t probes = 3;
wafwerar's avatar
wafwerar 已提交
602 603 604
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
605 606 607 608
    return -1;
  }

  int32_t alivetime = 10;
wafwerar's avatar
wafwerar 已提交
609 610 611
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
612 613 614 615
    return -1;
  }

  int32_t interval = 3;
wafwerar's avatar
wafwerar 已提交
616 617 618
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
619 620
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
621
#endif  // __APPLE__
622 623

  int32_t nodelay = 1;
wafwerar's avatar
wafwerar 已提交
624 625 626
  if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
    // printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
627 628 629 630 631 632
    return -1;
  }

  struct linger linger = {0};
  linger.l_onoff = 1;
  linger.l_linger = 3;
wafwerar's avatar
wafwerar 已提交
633 634 635
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
    // printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
636 637 638 639 640 641
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
int taosGetLocalIp(const char *eth, char *ip) {
#if defined(WINDOWS)
  // DO NOTHAING
  return 0;
#else
  int                fd;
  struct ifreq       ifr;
  struct sockaddr_in sin;

  fd = socket(AF_INET, SOCK_DGRAM, 0);
  if (-1 == fd) {
    return -1;
  }
  strncpy(ifr.ifr_name, eth, IFNAMSIZ);
  ifr.ifr_name[IFNAMSIZ - 1] = 0;

  if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) {
    taosCloseSocketNoCheck1(fd);
    return -1;
  }
  memcpy(&sin, &ifr.ifr_addr, sizeof(sin));
  snprintf(ip, 64, "%s", inet_ntoa(sin.sin_addr));
  taosCloseSocketNoCheck1(fd);
#endif
  return 0;
}
int taosValidIp(uint32_t ip) {
#if defined(WINDOWS)
  // DO NOTHAING
  return 0;
#else
  int ret = -1;
  int fd;

  struct ifconf ifconf;

  char buf[512] = {0};
  ifconf.ifc_len = 512;
  ifconf.ifc_buf = buf;

  if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
    return -1;
  }

  ioctl(fd, SIOCGIFCONF, &ifconf);
  struct ifreq *ifreq = (struct ifreq *)ifconf.ifc_buf;
  for (int i = (ifconf.ifc_len / sizeof(struct ifreq)); i > 0; i--) {
    char ip_str[64] = {0};
    if (ifreq->ifr_flags == AF_INET) {
      ret = taosGetLocalIp(ifreq->ifr_name, ip_str);
      if (ret != 0) {
        break;
      }
      ifreq++;
    }
    if (ip == (uint32_t)taosInetAddr(ip_str)) {
      ret = 0;
      break;
    }
  }
  taosCloseSocketNoCheck1(fd);
  return ret;
#endif
  return 0;
}

708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
  struct sockaddr_in serverAdd;
  SocketFd           fd;
  int32_t            reuse;

  // printf("open tcp server socket:0x%x:%hu", ip, port);

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return false;
  }

  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return false;
  }
  pSocket->refId = 0;
  pSocket->fd = fd;

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
dengyihao's avatar
dengyihao 已提交
739
    return false;
740 741 742 743 744 745 746 747
  }
  /* bind socket to server address */
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return false;
  }
  taosCloseSocket(&pSocket);
dengyihao's avatar
dengyihao 已提交
748
  return 0 == taosValidIp(ip) ? true : false;
749
}
wafwerar's avatar
wafwerar 已提交
750
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
751
  struct sockaddr_in serverAdd;
wafwerar's avatar
wafwerar 已提交
752
  SocketFd           fd;
753 754
  int32_t            reuse;

wafwerar's avatar
wafwerar 已提交
755
  // printf("open tcp server socket:0x%x:%hu", ip, port);
756 757 758 759 760 761

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
762 763 764 765 766 767
  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
768
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
769 770 771
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
772
  }
wafwerar's avatar
wafwerar 已提交
773 774
  pSocket->refId = 0;
  pSocket->fd = fd;
775 776 777

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
wafwerar's avatar
wafwerar 已提交
778 779 780 781
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
782 783 784
  }

  /* bind socket to server address */
wafwerar's avatar
wafwerar 已提交
785 786 787 788
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
789 790
  }

wafwerar's avatar
wafwerar 已提交
791 792 793 794
  if (taosKeepTcpAlive(pSocket) < 0) {
    // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
795 796
  }

wafwerar's avatar
wafwerar 已提交
797 798 799 800
  if (listen(pSocket->fd, 1024) < 0) {
    // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
801 802
  }

wafwerar's avatar
wafwerar 已提交
803
  return (TdSocketServerPtr)pSocket;
804 805
}

dengyihao's avatar
dengyihao 已提交
806
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
807 808 809 810 811 812 813 814 815
  if (pServerSocket == NULL || pServerSocket->fd < 0) {
    return NULL;
  }
  SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
  if (fd == -1) {
    // tError("TCP accept failure(%s)", strerror(errno));
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
816
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
817 818 819 820 821 822 823 824
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;
  return pSocket;
}
825 826 827
#define COPY_SIZE 32768
// sendfile shall be used

wafwerar's avatar
wafwerar 已提交
828 829 830 831
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
  if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
    return -1;
  }
832 833 834 835 836 837 838 839 840 841 842 843
  int64_t leftLen;
  int64_t readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = leftLen;
    else
      readLen = COPY_SIZE;  // 4K

wafwerar's avatar
wafwerar 已提交
844
    int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
845
    if (readLen != retLen) {
wafwerar's avatar
wafwerar 已提交
846 847
      // printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, retLen, len, leftLen, strerror(errno));
848 849 850
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
851
    writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
852 853

    if (readLen != writeLen) {
wafwerar's avatar
wafwerar 已提交
854 855
      // printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, writeLen, len, leftLen, strerror(errno));
856 857 858 859 860 861 862 863
      return -1;
    }

    leftLen -= readLen;
  }

  return len;
}
864 865

void taosBlockSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
866
#ifdef WINDOWS
867
#else
868 869 870
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
871
  int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
872
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
873
    // printf("failed to block SIGPIPE");
874
  }
875
#endif
876
}
877 878 879 880 881 882 883 884 885 886

uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
  struct addrinfo hints = {0};
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  struct addrinfo *result = NULL;

  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
  if (result) {
dengyihao's avatar
dengyihao 已提交
887
    struct sockaddr *   sa = result->ai_addr;
888 889 890 891 892 893 894 895
    struct sockaddr_in *si = (struct sockaddr_in *)sa;
    struct in_addr      ia = si->sin_addr;
    uint32_t            ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
#ifdef EAI_SYSTEM
    if (ret == EAI_SYSTEM) {
wafwerar's avatar
wafwerar 已提交
896
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
897
    } else {
wafwerar's avatar
wafwerar 已提交
898
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
899
    }
900
#else
wafwerar's avatar
wafwerar 已提交
901
    // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
902 903 904 905 906 907 908 909 910
#endif
    return 0xFFFFFFFF;
  }
}

int32_t taosGetFqdn(char *fqdn) {
  char hostname[1024];
  hostname[1023] = '\0';
  if (gethostname(hostname, 1023) == -1) {
wafwerar's avatar
wafwerar 已提交
911
    printf("failed to get hostname, reason:%s", strerror(errno));
wafwerar's avatar
wafwerar 已提交
912
    assert(0);
913 914 915 916 917 918 919 920 921 922 923
    return -1;
  }

  struct addrinfo  hints = {0};
  struct addrinfo *result = NULL;
#ifdef __APPLE__
  // on macosx, hostname -f has the form of xxx.local
  // which will block getaddrinfo for a few seconds if AI_CANONNAME is set
  // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
  // immediately
  hints.ai_family = AF_INET;
wafwerar's avatar
wafwerar 已提交
924
#else   // __APPLE__
925
  hints.ai_flags = AI_CANONNAME;
wafwerar's avatar
wafwerar 已提交
926
#endif  // __APPLE__
927 928
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
  if (!result) {
wafwerar's avatar
wafwerar 已提交
929
    printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
wafwerar's avatar
wafwerar 已提交
930
    assert(0);
931 932 933 934 935 936
    return -1;
  }

#ifdef __APPLE__
  // refer to comments above
  strcpy(fqdn, hostname);
wafwerar's avatar
wafwerar 已提交
937
#else   // __APPLE__
938
  strcpy(fqdn, result->ai_canonname);
wafwerar's avatar
wafwerar 已提交
939
#endif  // __APPLE__
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
  freeaddrinfo(result);
  return 0;
}

// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
  char ip_addr_cpy[20];
  char ip[5];

  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int32_t k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((uint32_t *)ip);
}

void tinet_ntoa(char *ipstr, uint32_t ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

void taosIgnSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
977
#ifdef WINDOWS
978 979 980 981 982 983
#else
  signal(SIGPIPE, SIG_IGN);
#endif
}

void taosSetMaskSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
984
#ifdef WINDOWS
985 986 987 988
#else
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
989
  int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
990
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
991 992 993 994 995
    // printf("failed to setmask SIGPIPE");
  }
#endif
}

wafwerar's avatar
wafwerar 已提交
996
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
997 998 999 1000 1001 1002 1003 1004
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  return getsockname(pSocket->fd, destAddr, addrLen);
}

TdEpollPtr taosCreateEpoll(int32_t size) {
  EpollFd fd = -1;
wafwerar's avatar
wafwerar 已提交
1005
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
1006 1007 1008 1009 1010 1011 1012
#else
  fd = epoll_create(size);
#endif
  if (fd < 0) {
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
1013
  TdEpollPtr pEpoll = (TdEpollPtr)taosMemoryMalloc(sizeof(TdEpoll));
wafwerar's avatar
wafwerar 已提交
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
  if (pEpoll == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pEpoll->fd = fd;
  pEpoll->refId = 0;
  return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
1027
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
1028 1029 1030 1031 1032 1033 1034 1035 1036
#else
  code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
  return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
1037
  }
wafwerar's avatar
wafwerar 已提交
1038
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
1039 1040
#else
  code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
1041
#endif
wafwerar's avatar
wafwerar 已提交
1042 1043 1044 1045 1046 1047 1048 1049 1050
  return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
  int32_t code;
  if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
  (*ppEpoll)->fd = -1;
wafwerar's avatar
wafwerar 已提交
1051
  taosMemoryFree(*ppEpoll);
wafwerar's avatar
wafwerar 已提交
1052 1053
  return code;
}
dengyihao's avatar
dengyihao 已提交
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
/*
 * Set TCP connection timeout per-socket level.
 * ref [https://github.com/libuv/help/issues/54]
 */
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
#if defined(WINDOWS)
  SOCKET fd;
#else
  int      fd;
#endif
  if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
    return -1;
  }
#if defined(WINDOWS)
  if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) {
    return -1;
  }
#else  // Linux like systems
  uint32_t conn_timeout_ms = conn_timeout_sec * 1000;
  if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
    return -1;
  }
#endif
  return (int)fd;
}