osSocket.c 26.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
S
Shengliang Guan 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19

20
#if defined(WINDOWS)
wafwerar's avatar
wafwerar 已提交
21 22 23 24 25 26 27
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <Winsock2.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winbase.h>
S
Shengliang Guan 已提交
28
#else
wafwerar's avatar
wafwerar 已提交
29 30 31 32 33 34 35 36 37
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <unistd.h>
38 39

#if defined(DARWIN)
dengyihao's avatar
dengyihao 已提交
40 41
#include <dispatch/dispatch.h>
#include "osEok.h"
42
#else
dengyihao's avatar
dengyihao 已提交
43
#include <sys/epoll.h>
44
#endif
S
Shengliang Guan 已提交
45 46
#endif

dengyihao's avatar
dengyihao 已提交
47 48 49 50
#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif

wafwerar's avatar
wafwerar 已提交
51 52
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
53
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
54 55 56
#endif
  int      refId;
  SocketFd fd;
dengyihao's avatar
dengyihao 已提交
57
} * TdSocketServerPtr, TdSocketServer;
S
Shengliang Guan 已提交
58

wafwerar's avatar
wafwerar 已提交
59 60
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
61
  TdThreadRwlock rwlock;
wafwerar's avatar
wafwerar 已提交
62
#endif
dengyihao's avatar
dengyihao 已提交
63 64 65
  int     refId;
  EpollFd fd;
} * TdEpollPtr, TdEpoll;
S
Shengliang Guan 已提交
66

wafwerar's avatar
wafwerar 已提交
67
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
dengyihao's avatar
dengyihao 已提交
68
                   int addrlen) {
wafwerar's avatar
wafwerar 已提交
69 70
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
71
  }
wafwerar's avatar
wafwerar 已提交
72
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
73 74 75 76 77 78 79 80
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#else
  return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#endif
}
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
81
  }
wafwerar's avatar
wafwerar 已提交
82
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
83 84 85 86 87 88 89 90 91
  return send(pSocket->fd, buf, len, 0);
  ;
#else
  return write(pSocket->fd, buf, len);
#endif
}
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
92
  }
wafwerar's avatar
wafwerar 已提交
93
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
94 95 96 97 98 99
  return recv(pSocket->fd, buf, len, 0);
  ;
#else
  return read(pSocket->fd, buf, len);
#endif
}
S
Shengliang Guan 已提交
100

dengyihao's avatar
dengyihao 已提交
101 102
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
                           int *addrLen) {
wafwerar's avatar
wafwerar 已提交
103 104
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
105
  }
wafwerar's avatar
wafwerar 已提交
106 107 108
  return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
}
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
wafwerar's avatar
wafwerar 已提交
109
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
110 111 112
  return closesocket(fd);
#else
  return close(fd);
S
Shengliang Guan 已提交
113
#endif
wafwerar's avatar
wafwerar 已提交
114 115 116 117 118 119 120 121
}
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
  int32_t code;
  if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocket)->fd);
  (*ppSocket)->fd = -1;
wafwerar's avatar
wafwerar 已提交
122
  taosMemoryFree(*ppSocket);
wafwerar's avatar
wafwerar 已提交
123 124 125 126 127 128 129 130 131
  return code;
}
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
  int32_t code;
  if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
  (*ppSocketServer)->fd = -1;
wafwerar's avatar
wafwerar 已提交
132
  taosMemoryFree(*ppSocketServer);
wafwerar's avatar
wafwerar 已提交
133 134
  return code;
}
S
Shengliang Guan 已提交
135

wafwerar's avatar
wafwerar 已提交
136 137 138 139
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
140
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
141
  return closesocket(pSocket->fd);
S
Shengliang Guan 已提交
142
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
143
  return close(pSocket->fd);
S
Shengliang Guan 已提交
144
#else
wafwerar's avatar
wafwerar 已提交
145
  return shutdown(pSocket->fd, SHUT_RD);
S
Shengliang Guan 已提交
146 147
#endif
}
wafwerar's avatar
wafwerar 已提交
148 149 150 151
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
S
Shengliang Guan 已提交
152
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
153
  return closesocket(pSocketServer->fd);
S
Shengliang Guan 已提交
154
#elif __APPLE__
wafwerar's avatar
wafwerar 已提交
155
  return close(pSocketServer->fd);
S
Shengliang Guan 已提交
156
#else
wafwerar's avatar
wafwerar 已提交
157
  return shutdown(pSocketServer->fd, SHUT_RD);
S
Shengliang Guan 已提交
158 159
#endif
}
S
Shengliang Guan 已提交
160

wafwerar's avatar
wafwerar 已提交
161 162 163
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
164
  }
wafwerar's avatar
wafwerar 已提交
165 166 167 168 169 170
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_WR);
S
slguan 已提交
171
#endif
S
Shengliang Guan 已提交
172
}
wafwerar's avatar
wafwerar 已提交
173 174 175 176 177 178 179 180 181 182
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
  }
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_WR);
S
Shengliang Guan 已提交
183
#endif
wafwerar's avatar
wafwerar 已提交
184 185 186 187
}
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
188
  }
wafwerar's avatar
wafwerar 已提交
189 190 191 192 193 194 195 196 197 198 199
#ifdef WINDOWS
  return closesocket(pSocket->fd);
#elif __APPLE__
  return close(pSocket->fd);
#else
  return shutdown(pSocket->fd, SHUT_RDWR);
#endif
}
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
  if (pSocketServer == NULL || pSocketServer->fd < 0) {
    return -1;
S
Shengliang Guan 已提交
200
  }
wafwerar's avatar
wafwerar 已提交
201 202 203 204 205 206 207
#ifdef WINDOWS
  return closesocket(pSocketServer->fd);
#elif __APPLE__
  return close(pSocketServer->fd);
#else
  return shutdown(pSocketServer->fd, SHUT_RDWR);
#endif
208
}
wafwerar's avatar
wafwerar 已提交
209

wafwerar's avatar
wafwerar 已提交
210 211
void taosWinSocketInit() {
#ifdef WINDOWS
S
Shengliang Guan 已提交
212 213 214 215 216 217 218 219 220
  static char flag = 0;
  if (flag == 0) {
    WORD    wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(1, 1);
    if (WSAStartup(wVersionRequested, &wsaData) == 0) {
      flag = 1;
    }
  }
wafwerar's avatar
wafwerar 已提交
221 222
#else
#endif
223
}
wafwerar's avatar
wafwerar 已提交
224 225 226 227
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
228
#ifdef WINDOWS
S
Shengliang Guan 已提交
229 230 231
  u_long mode;
  if (on) {
    mode = 1;
wafwerar's avatar
wafwerar 已提交
232
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
S
Shengliang Guan 已提交
233 234
  } else {
    mode = 0;
wafwerar's avatar
wafwerar 已提交
235 236 237 238 239 240 241
    ioctlsocket(pSocket->fd, FIONBIO, &mode);
  }
#else
  int32_t flags = 0;
  if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
    // printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
S
Shengliang Guan 已提交
242
  }
wafwerar's avatar
wafwerar 已提交
243 244 245 246 247 248 249 250 251 252 253

  if (on)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
    // printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
    return 1;
  }
#endif
S
Shengliang Guan 已提交
254 255
  return 0;
}
wafwerar's avatar
wafwerar 已提交
256 257 258 259
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
260
#ifdef WINDOWS
S
Shengliang Guan 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPCNT) {
    return 0;
  }

wafwerar's avatar
wafwerar 已提交
277 278
  return setsockopt(pSocket->fd, level, optname, optval, optlen);
#else
wafwerar's avatar
wafwerar 已提交
279
  return setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
wafwerar's avatar
wafwerar 已提交
280 281 282 283 284 285
#endif
}
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
286
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
287 288
  return 0;
#else
wafwerar's avatar
wafwerar 已提交
289
  return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
wafwerar's avatar
wafwerar 已提交
290
#endif
S
Shengliang Guan 已提交
291
}
S
config  
Shengliang Guan 已提交
292
uint32_t taosInetAddr(const char *ipAddr) {
wafwerar's avatar
wafwerar 已提交
293
#ifdef WINDOWS
S
Shengliang Guan 已提交
294 295 296 297 298 299 300
  uint32_t value;
  int32_t  ret = inet_pton(AF_INET, ipAddr, &value);
  if (ret <= 0) {
    return INADDR_NONE;
  } else {
    return value;
  }
wafwerar's avatar
wafwerar 已提交
301 302 303
#else
  return inet_addr(ipAddr);
#endif
S
Shengliang Guan 已提交
304
}
S
Shengliang Guan 已提交
305
const char *taosInetNtoa(struct in_addr ipInt) {
wafwerar's avatar
wafwerar 已提交
306
#ifdef WINDOWS
S
Shengliang Guan 已提交
307 308 309
  // not thread safe, only for debug usage while print log
  static char tmpDstStr[16];
  return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
wafwerar's avatar
wafwerar 已提交
310 311
#else
  return inet_ntoa(ipInt);
312
#endif
wafwerar's avatar
wafwerar 已提交
313
}
314 315

#ifndef SIGPIPE
wafwerar's avatar
wafwerar 已提交
316
#define SIGPIPE EPIPE
317 318 319 320
#endif

#define TCP_CONN_TIMEOUT 3000  // conn timeout

wafwerar's avatar
wafwerar 已提交
321 322 323 324
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
325
  int32_t nleft, nwritten;
dengyihao's avatar
dengyihao 已提交
326
  char *  ptr = (char *)buf;
327 328 329 330

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
331
    nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
    if (nwritten <= 0) {
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
350 351 352 353
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
354
  int32_t nleft, nread;
dengyihao's avatar
dengyihao 已提交
355
  char *  ptr = (char *)buf;
356 357 358 359

  nleft = nbytes;

  while (nleft > 0) {
wafwerar's avatar
wafwerar 已提交
360
    nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
361 362 363
    if (nread == 0) {
      break;
    } else if (nread < 0) {
wafwerar's avatar
wafwerar 已提交
364
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
382 383 384 385 386
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  taosSetNonblocking(pSocket, 1);
387

wafwerar's avatar
wafwerar 已提交
388 389
  int32_t        nleft, nwritten, nready;
  fd_set         fset;
390 391 392 393 394 395 396
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
wafwerar's avatar
wafwerar 已提交
397 398
    FD_SET(pSocket->fd, &fset);
    if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
399
      errno = ETIMEDOUT;
wafwerar's avatar
wafwerar 已提交
400
      // printf("fd %d timeout, no enough space to write", fd);
401 402 403 404 405
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
406
      // printf("select error, %d (%s)", errno, strerror(errno));
407 408 409
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
410
    nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
411 412 413
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

wafwerar's avatar
wafwerar 已提交
414
      // printf("write error, %d (%s)", errno, strerror(errno));
415 416 417 418 419 420 421
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

wafwerar's avatar
wafwerar 已提交
422
  taosSetNonblocking(pSocket, 0);
423 424 425 426

  return (nbytes - nleft);
}

wafwerar's avatar
wafwerar 已提交
427
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
428
  struct sockaddr_in localAddr;
wafwerar's avatar
wafwerar 已提交
429 430
  SocketFd           fd;
  int32_t            bufSize = 1024000;
431

wafwerar's avatar
wafwerar 已提交
432
  // printf("open udp socket:0x%x:%hu", ip, port);
433 434 435 436 437 438

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
  localAddr.sin_addr.s_addr = ip;
  localAddr.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
439 440 441 442
  if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
    // printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
443 444
  }

wafwerar's avatar
wafwerar 已提交
445
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
446 447 448
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
449
  }
wafwerar's avatar
wafwerar 已提交
450 451
  pSocket->fd = fd;
  pSocket->refId = 0;
452

wafwerar's avatar
wafwerar 已提交
453 454 455 456 457 458 459 460 461 462
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
  }

  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for UDP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
463 464 465
  }

  /* bind socket to local address */
wafwerar's avatar
wafwerar 已提交
466 467 468 469
  if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
    // printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
    taosCloseSocket(&pSocket);
    return NULL;
470 471
  }

wafwerar's avatar
wafwerar 已提交
472
  return pSocket;
473 474
}

wafwerar's avatar
wafwerar 已提交
475 476 477
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
  SocketFd           fd = -1;
  int32_t            ret;
478
  struct sockaddr_in serverAddr, clientAddr;
wafwerar's avatar
wafwerar 已提交
479
  int32_t            bufSize = 1024 * 1024;
480

wafwerar's avatar
wafwerar 已提交
481
  fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
482

wafwerar's avatar
wafwerar 已提交
483 484 485 486
  if (fd <= 2) {
    // printf("failed to open the socket: %d (%s)", errno, strerror(errno));
    if (fd >= 0) taosCloseSocketNoCheck1(fd);
    return NULL;
487 488
  }

wafwerar's avatar
wafwerar 已提交
489
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
490 491 492 493 494 495 496
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;

497 498
  /* set REUSEADDR option, so the portnumber can be re-used */
  int32_t reuse = 1;
wafwerar's avatar
wafwerar 已提交
499 500 501 502
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
503 504
  }

wafwerar's avatar
wafwerar 已提交
505 506 507 508
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the send buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
509 510
  }

wafwerar's avatar
wafwerar 已提交
511 512 513 514
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
    // printf("failed to set the receive buffer size for TCP socket\n");
    taosCloseSocket(&pSocket);
    return NULL;
515 516 517 518 519 520 521 522 523
  }

  if (clientIp != 0) {
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
    clientAddr.sin_addr.s_addr = clientIp;
    clientAddr.sin_port = 0;

    /* bind socket to client address */
wafwerar's avatar
wafwerar 已提交
524 525 526 527 528
    if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
      // printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
      //        strerror(errno));
      taosCloseSocket(&pSocket);
      return NULL;
529 530 531 532 533 534 535 536
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.s_addr = destIp;
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

wafwerar's avatar
wafwerar 已提交
537 538 539
#ifdef _TD_LINUX
  taosSetNonblocking(pSocket, 1);
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
540 541
  if (ret == -1) {
    if (errno == EHOSTUNREACH) {
wafwerar's avatar
wafwerar 已提交
542 543 544
      // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
      taosCloseSocket(&pSocket);
      return -1;
545
    } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
wafwerar's avatar
wafwerar 已提交
546
      struct pollfd wfd[1];
547

wafwerar's avatar
wafwerar 已提交
548
      wfd[0].fd = pSocket->fd;
549
      wfd[0].events = POLLOUT;
wafwerar's avatar
wafwerar 已提交
550

551 552
      int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
      if (res == -1 || res == 0) {
wafwerar's avatar
wafwerar 已提交
553 554
        // printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
555 556
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
557 558 559 560
      int optVal = -1, optLen = sizeof(int);
      if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
        // printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
        taosCloseSocket(&pSocket);  //
561 562 563
        return -1;
      }
      ret = 0;
wafwerar's avatar
wafwerar 已提交
564 565 566 567 568
    } else {  // Other error
      // printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
      taosCloseSocket(&pSocket);  //
      return -1;
    }
569
  }
wafwerar's avatar
wafwerar 已提交
570
  taosSetNonblocking(pSocket, 0);
571 572

#else
wafwerar's avatar
wafwerar 已提交
573
  ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
574 575 576
#endif

  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
577 578 579
    // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
580
  } else {
wafwerar's avatar
wafwerar 已提交
581
    taosKeepTcpAlive(pSocket);
582 583
  }

wafwerar's avatar
wafwerar 已提交
584
  return pSocket;
585 586
}

wafwerar's avatar
wafwerar 已提交
587 588 589 590
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
591
  int32_t alive = 1;
wafwerar's avatar
wafwerar 已提交
592 593 594
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
595 596 597 598 599 600
    return -1;
  }

#ifndef __APPLE__
  // all fails on macosx
  int32_t probes = 3;
wafwerar's avatar
wafwerar 已提交
601 602 603
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
604 605 606 607
    return -1;
  }

  int32_t alivetime = 10;
wafwerar's avatar
wafwerar 已提交
608 609 610
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
611 612 613 614
    return -1;
  }

  int32_t interval = 3;
wafwerar's avatar
wafwerar 已提交
615 616 617
  if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
    // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
618 619
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
620
#endif  // __APPLE__
621 622

  int32_t nodelay = 1;
wafwerar's avatar
wafwerar 已提交
623 624 625
  if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
    // printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
    taosCloseSocket(&pSocket);
626 627 628 629 630 631
    return -1;
  }

  struct linger linger = {0};
  linger.l_onoff = 1;
  linger.l_linger = 3;
wafwerar's avatar
wafwerar 已提交
632 633 634
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
    // printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
635 636 637 638 639 640
    return -1;
  }

  return 0;
}

641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
  struct sockaddr_in serverAdd;
  SocketFd           fd;
  int32_t            reuse;

  // printf("open tcp server socket:0x%x:%hu", ip, port);

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return false;
  }

  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return false;
  }
  pSocket->refId = 0;
  pSocket->fd = fd;

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
  }
  /* bind socket to server address */
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return false;
  }
dengyihao's avatar
dengyihao 已提交
680 681 682 683 684
  if (listen(pSocket->fd, 1024) < 0) {
    // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
  }
685 686 687
  taosCloseSocket(&pSocket);
  return true;
}
wafwerar's avatar
wafwerar 已提交
688
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
689
  struct sockaddr_in serverAdd;
wafwerar's avatar
wafwerar 已提交
690
  SocketFd           fd;
691 692
  int32_t            reuse;

wafwerar's avatar
wafwerar 已提交
693
  // printf("open tcp server socket:0x%x:%hu", ip, port);
694 695 696 697 698 699

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

wafwerar's avatar
wafwerar 已提交
700 701 702 703 704 705
  if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
    // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
706
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
707 708 709
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
710
  }
wafwerar's avatar
wafwerar 已提交
711 712
  pSocket->refId = 0;
  pSocket->fd = fd;
713 714 715

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
wafwerar's avatar
wafwerar 已提交
716 717 718 719
  if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
    // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
720 721 722
  }

  /* bind socket to server address */
wafwerar's avatar
wafwerar 已提交
723 724 725 726
  if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
    // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
727 728
  }

wafwerar's avatar
wafwerar 已提交
729 730 731 732
  if (taosKeepTcpAlive(pSocket) < 0) {
    // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
733 734
  }

wafwerar's avatar
wafwerar 已提交
735 736 737 738
  if (listen(pSocket->fd, 1024) < 0) {
    // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
    taosCloseSocket(&pSocket);
    return NULL;
739 740
  }

wafwerar's avatar
wafwerar 已提交
741
  return (TdSocketServerPtr)pSocket;
742 743
}

dengyihao's avatar
dengyihao 已提交
744
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
745 746 747 748 749 750 751 752 753
  if (pServerSocket == NULL || pServerSocket->fd < 0) {
    return NULL;
  }
  SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
  if (fd == -1) {
    // tError("TCP accept failure(%s)", strerror(errno));
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
754
  TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
wafwerar's avatar
wafwerar 已提交
755 756 757 758 759 760 761 762
  if (pSocket == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pSocket->fd = fd;
  pSocket->refId = 0;
  return pSocket;
}
763 764 765
#define COPY_SIZE 32768
// sendfile shall be used

wafwerar's avatar
wafwerar 已提交
766 767 768 769
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
  if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
    return -1;
  }
770 771 772 773 774 775 776 777 778 779 780 781
  int64_t leftLen;
  int64_t readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = leftLen;
    else
      readLen = COPY_SIZE;  // 4K

wafwerar's avatar
wafwerar 已提交
782
    int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
783
    if (readLen != retLen) {
wafwerar's avatar
wafwerar 已提交
784 785
      // printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, retLen, len, leftLen, strerror(errno));
786 787 788
      return -1;
    }

wafwerar's avatar
wafwerar 已提交
789
    writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
790 791

    if (readLen != writeLen) {
wafwerar's avatar
wafwerar 已提交
792 793
      // printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //        readLen, writeLen, len, leftLen, strerror(errno));
794 795 796 797 798 799 800 801
      return -1;
    }

    leftLen -= readLen;
  }

  return len;
}
802 803

void taosBlockSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
804
#ifdef WINDOWS
805
#else
806 807 808
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
809
  int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
810
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
811
    // printf("failed to block SIGPIPE");
812
  }
813
#endif
814
}
815 816 817 818 819 820 821 822 823 824

uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
  struct addrinfo hints = {0};
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  struct addrinfo *result = NULL;

  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
  if (result) {
dengyihao's avatar
dengyihao 已提交
825
    struct sockaddr *   sa = result->ai_addr;
826 827 828 829 830 831 832 833
    struct sockaddr_in *si = (struct sockaddr_in *)sa;
    struct in_addr      ia = si->sin_addr;
    uint32_t            ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
#ifdef EAI_SYSTEM
    if (ret == EAI_SYSTEM) {
wafwerar's avatar
wafwerar 已提交
834
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
835
    } else {
wafwerar's avatar
wafwerar 已提交
836
      // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
837
    }
838
#else
wafwerar's avatar
wafwerar 已提交
839
    // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
840 841 842 843 844 845 846 847 848
#endif
    return 0xFFFFFFFF;
  }
}

int32_t taosGetFqdn(char *fqdn) {
  char hostname[1024];
  hostname[1023] = '\0';
  if (gethostname(hostname, 1023) == -1) {
wafwerar's avatar
wafwerar 已提交
849
    printf("failed to get hostname, reason:%s", strerror(errno));
wafwerar's avatar
wafwerar 已提交
850
    assert(0);
851 852 853 854 855 856 857 858 859 860 861
    return -1;
  }

  struct addrinfo  hints = {0};
  struct addrinfo *result = NULL;
#ifdef __APPLE__
  // on macosx, hostname -f has the form of xxx.local
  // which will block getaddrinfo for a few seconds if AI_CANONNAME is set
  // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
  // immediately
  hints.ai_family = AF_INET;
wafwerar's avatar
wafwerar 已提交
862
#else   // __APPLE__
863
  hints.ai_flags = AI_CANONNAME;
wafwerar's avatar
wafwerar 已提交
864
#endif  // __APPLE__
865 866
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
  if (!result) {
wafwerar's avatar
wafwerar 已提交
867
    printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
wafwerar's avatar
wafwerar 已提交
868
    assert(0);
869 870 871 872 873 874
    return -1;
  }

#ifdef __APPLE__
  // refer to comments above
  strcpy(fqdn, hostname);
wafwerar's avatar
wafwerar 已提交
875
#else   // __APPLE__
876
  strcpy(fqdn, result->ai_canonname);
wafwerar's avatar
wafwerar 已提交
877
#endif  // __APPLE__
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
  freeaddrinfo(result);
  return 0;
}

// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
  char ip_addr_cpy[20];
  char ip[5];

  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int32_t k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((uint32_t *)ip);
}

void tinet_ntoa(char *ipstr, uint32_t ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

void taosIgnSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
915
#ifdef WINDOWS
916 917 918 919 920 921
#else
  signal(SIGPIPE, SIG_IGN);
#endif
}

void taosSetMaskSIGPIPE() {
wafwerar's avatar
wafwerar 已提交
922
#ifdef WINDOWS
923 924 925 926
#else
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
wafwerar's avatar
wafwerar 已提交
927
  int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
928
  if (rc != 0) {
wafwerar's avatar
wafwerar 已提交
929 930 931 932 933
    // printf("failed to setmask SIGPIPE");
  }
#endif
}

wafwerar's avatar
wafwerar 已提交
934
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) {
wafwerar's avatar
wafwerar 已提交
935 936 937 938 939 940 941 942
  if (pSocket == NULL || pSocket->fd < 0) {
    return -1;
  }
  return getsockname(pSocket->fd, destAddr, addrLen);
}

TdEpollPtr taosCreateEpoll(int32_t size) {
  EpollFd fd = -1;
wafwerar's avatar
wafwerar 已提交
943
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
944 945 946 947 948 949 950
#else
  fd = epoll_create(size);
#endif
  if (fd < 0) {
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
951
  TdEpollPtr pEpoll = (TdEpollPtr)taosMemoryMalloc(sizeof(TdEpoll));
wafwerar's avatar
wafwerar 已提交
952 953 954 955 956 957 958 959 960 961 962 963 964
  if (pEpoll == NULL) {
    taosCloseSocketNoCheck1(fd);
    return NULL;
  }
  pEpoll->fd = fd;
  pEpoll->refId = 0;
  return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
965
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
966 967 968 969 970 971 972 973 974
#else
  code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
  return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
  int32_t code = -1;
  if (pEpoll == NULL || pEpoll->fd < 0) {
    return -1;
975
  }
wafwerar's avatar
wafwerar 已提交
976
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
977 978
#else
  code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
979
#endif
wafwerar's avatar
wafwerar 已提交
980 981 982 983 984 985 986 987 988
  return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
  int32_t code;
  if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
    return -1;
  }
  code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
  (*ppEpoll)->fd = -1;
wafwerar's avatar
wafwerar 已提交
989
  taosMemoryFree(*ppEpoll);
wafwerar's avatar
wafwerar 已提交
990 991
  return code;
}
dengyihao's avatar
dengyihao 已提交
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
/*
 * Set TCP connection timeout per-socket level.
 * ref [https://github.com/libuv/help/issues/54]
 */
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
#if defined(WINDOWS)
  SOCKET fd;
#else
  int      fd;
#endif
  if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
    return -1;
  }
#if defined(WINDOWS)
  if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) {
    return -1;
  }
#else  // Linux like systems
  uint32_t conn_timeout_ms = conn_timeout_sec * 1000;
  if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
    return -1;
  }
#endif
  return (int)fd;
}