osSocket.c 21.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#define ALLOW_FORBID_FUNC
S
Shengliang Guan 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
S
Shengliang Guan 已提交
21 22 23 24
  #include "winsock2.h"
  #include <WS2tcpip.h>
  #include <winbase.h>
  #include <Winsock2.h>
S
Shengliang Guan 已提交
25
#else
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33 34 35 36
  #include <arpa/inet.h>
  #include <fcntl.h>
  #include <netdb.h>
  #include <netinet/in.h>
  #include <netinet/ip.h>
  #include <netinet/tcp.h>
  #include <netinet/udp.h>
  #include <sys/socket.h>
  #include <unistd.h>
#endif

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
#ifndef USE_UV

// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
//   pthread_rwlock_t rwlock;
// #endif
//   int        refId;
//   SocketFd   fd;
// } * TdSocketServerPtr, TdSocketServer;

// typedef struct TdSocketConnector {
// #if SOCKET_WITH_LOCK
//   pthread_rwlock_t rwlock;
// #endif
//   int        refId;
//   SocketFd   fd;
// } * TdSocketConnectorPtr, TdSocketConnector;

S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)

#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
int32_t taosSendto(SocketFd fd, void *buf, int len, unsigned int flags, const struct sockaddr *to, int tolen) {
  return sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen);
}
int32_t taosWriteSocket(SocketFd fd, void *buf, int len) { return send((SOCKET)fd, buf, len, 0); }
int32_t taosReadSocket(SocketFd fd, void *buf, int len) { return recv((SOCKET)fd, buf, len, 0)(); }
int32_t taosCloseSocketNoCheck(SocketFd fd) { return closesocket((SOCKET)fd); }
int32_t taosCloseSocket(SocketFd fd) { closesocket((SOCKET)fd) }

#else

  #define taosSend(sockfd, buf, len, flags) send(sockfd, buf, len, flags)
  int32_t taosSendto(SocketFd fd, void * buf, int len, unsigned int flags, const struct sockaddr * dest_addr, int addrlen) {
    return sendto(fd, buf, len, flags, dest_addr, addrlen);
  }

  int32_t taosWriteSocket(SocketFd fd, void *buf, int len) {
    return write(fd, buf, len);
  }

  int32_t taosReadSocket(SocketFd fd, void *buf, int len) {
    return read(fd, buf, len);
  }

  int32_t taosCloseSocketNoCheck(SocketFd fd) {
    return close(fd);
  }

  int32_t taosCloseSocket(SocketFd fd) {
    if (fd > -1) {
      close(fd);
    }
  }
S
Shengliang Guan 已提交
90 91
#endif

S
Shengliang Guan 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
void taosShutDownSocketRD(SOCKET fd) {
#ifdef WINDOWS
  closesocket(fd);
#elif __APPLE__
  close(fd);
#else
  shutdown(fd, SHUT_RD);
#endif
}

void taosShutDownSocketWR(SOCKET fd) {
#ifdef WINDOWS
  closesocket(fd);
#elif __APPLE__
  close(fd);
#else
  shutdown(fd, SHUT_WR);
#endif
}
S
Shengliang Guan 已提交
111

S
TD-4088  
Shengliang Guan 已提交
112
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
S
Shengliang Guan 已提交
113

S
TD-1912  
Shengliang Guan 已提交
114 115
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
  int32_t flags = 0;
S
Shengliang Guan 已提交
116
  if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
S
Shengliang Guan 已提交
117
    //printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
S
Shengliang Guan 已提交
118 119 120 121 122 123 124 125 126
    return 1;
  }

  if (on)
    flags |= O_NONBLOCK;
  else
    flags &= ~O_NONBLOCK;

  if ((flags = fcntl(sock, F_SETFL, flags)) < 0) {
S
Shengliang Guan 已提交
127
    //printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
S
Shengliang Guan 已提交
128 129 130 131 132 133
    return 1;
  }

  return 0;
}

S
Shengliang Guan 已提交
134
void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); }
S
TD-2524  
Shengliang Guan 已提交
135

S
Shengliang Guan 已提交
136

S
TD-1207  
Shengliang Guan 已提交
137 138 139 140 141 142
void taosSetMaskSIGPIPE() {
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
  int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
  if (rc != 0) {
S
Shengliang Guan 已提交
143
    //printf("failed to setmask SIGPIPE");
S
TD-1207  
Shengliang Guan 已提交
144 145 146
  }
}

S
slguan 已提交
147 148
#endif

S
TD-4088  
Shengliang Guan 已提交
149
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_32))
S
slguan 已提交
150

S
TD-1912  
Shengliang Guan 已提交
151
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
S
slguan 已提交
152 153 154
  return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
}

S
Shengliang Guan 已提交
155
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
Y
yihaoDeng 已提交
156
  return getsockopt(socketfd, level, optname, optval, (socklen_t *)optlen);
S
Shengliang Guan 已提交
157
}
S
TD-4088  
Shengliang Guan 已提交
158

159 160
#endif

S
Shengliang Guan 已提交
161
#if !((defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) && defined(_MSC_VER))
162

S
config  
Shengliang Guan 已提交
163
uint32_t taosInetAddr(const char *ipAddr) { return inet_addr(ipAddr); }
S
Shengliang Guan 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); }

#endif

#if defined(_TD_DARWIN_64)

/*
 * darwin implementation
 */

int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
  if (level == SOL_SOCKET && optname == SO_SNDBUF) {
    return 0;
  }

  if (level == SOL_SOCKET && optname == SO_RCVBUF) {
    return 0;
  }

  return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
185
}
S
Shengliang Guan 已提交
186
#endif
187

S
Shengliang Guan 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)

/*
 * windows implementation
 */

#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winsock2.h>
#include <ws2def.h>

void taosWinSocketInit() {
  static char flag = 0;
  if (flag == 0) {
    WORD    wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD(1, 1);
    if (WSAStartup(wVersionRequested, &wsaData) == 0) {
      flag = 1;
    }
  }
212 213
}

S
Shengliang Guan 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
  u_long mode;
  if (on) {
    mode = 1;
    ioctlsocket(sock, FIONBIO, &mode);
  } else {
    mode = 0;
    ioctlsocket(sock, FIONBIO, &mode);
  }
  return 0;
}

void taosIgnSIGPIPE() {}
void taosSetMaskSIGPIPE() {}

int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
  if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPIDLE) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPINTVL) {
    return 0;
  }

  if (level == SOL_TCP && optname == TCP_KEEPCNT) {
    return 0;
  }

  return setsockopt(socketfd, level, optname, optval, optlen);
}

#ifdef _MSC_VER
//#if _MSC_VER >= 1900

S
config  
Shengliang Guan 已提交
252
uint32_t taosInetAddr(const char *ipAddr) {
S
Shengliang Guan 已提交
253 254 255 256 257 258 259 260
  uint32_t value;
  int32_t  ret = inet_pton(AF_INET, ipAddr, &value);
  if (ret <= 0) {
    return INADDR_NONE;
  } else {
    return value;
  }
}
S
Shengliang Guan 已提交
261 262 263 264 265 266 267

const char *taosInetNtoa(struct in_addr ipInt) {
  // not thread safe, only for debug usage while print log
  static char tmpDstStr[16];
  return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
}

S
Shengliang Guan 已提交
268
//#endif
F
eok  
freemine 已提交
269
#endif
S
Shengliang Guan 已提交
270 271 272 273 274

#if defined(_TD_GO_DLL_)

uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); }

S
Shengliang Guan 已提交
275 276
#endif

277 278 279 280 281 282 283 284 285 286 287 288
#endif

#ifndef SIGPIPE
  #define SIGPIPE EPIPE
#endif

#define TCP_CONN_TIMEOUT 3000  // conn timeout

int32_t taosGetFqdn(char *fqdn) {
  char hostname[1024];
  hostname[1023] = '\0';
  if (gethostname(hostname, 1023) == -1) {
S
Shengliang Guan 已提交
289
    //printf("failed to get hostname, reason:%s", strerror(errno));
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    return -1;
  }

  struct addrinfo  hints = {0};
  struct addrinfo *result = NULL;
#ifdef __APPLE__
  // on macosx, hostname -f has the form of xxx.local
  // which will block getaddrinfo for a few seconds if AI_CANONNAME is set
  // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
  // immediately
  hints.ai_family = AF_INET;
#else // __APPLE__
  hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
  int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
  if (!result) {
S
Shengliang Guan 已提交
306
    //printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
    return -1;
  }

#ifdef __APPLE__
  // refer to comments above
  strcpy(fqdn, hostname);
#else // __APPLE__
  strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
  freeaddrinfo(result);
  return 0;
}

uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
  struct addrinfo hints = {0};
  hints.ai_family = AF_INET;
  hints.ai_socktype = SOCK_STREAM;

  struct addrinfo *result = NULL;

  int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
  if (result) {
    struct sockaddr *   sa = result->ai_addr;
    struct sockaddr_in *si = (struct sockaddr_in *)sa;
    struct in_addr      ia = si->sin_addr;
    uint32_t            ip = ia.s_addr;
    freeaddrinfo(result);
    return ip;
  } else {
#ifdef EAI_SYSTEM
    if (ret == EAI_SYSTEM) {
S
Shengliang Guan 已提交
338
      //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
339
    } else {
S
Shengliang Guan 已提交
340
      //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
341 342
    }
#else
S
Shengliang Guan 已提交
343
    //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
#endif
    return 0xFFFFFFFF;
  }
}

// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
  char ip_addr_cpy[20];
  char ip[5];

  tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));

  char *s_start, *s_end;
  s_start = ip_addr_cpy;
  s_end = ip_addr_cpy;

  int32_t k;

  for (k = 0; *s_start != '\0'; s_start = s_end) {
    for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
    }
    if (*s_end == '.') {
      *s_end = '\0';
      s_end++;
    }
    ip[k++] = (char)atoi(s_start);
  }

  ip[k] = '\0';

  return *((uint32_t *)ip);
}

int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
  int32_t nleft, nwritten;
  char *  ptr = (char *)buf;

  nleft = nbytes;

  while (nleft > 0) {
    nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
    if (nwritten <= 0) {
      if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
        continue;
      else
        return -1;
    } else {
      nleft -= nwritten;
      ptr += nwritten;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
  int32_t nleft, nread;
  char *  ptr = (char *)buf;

  nleft = nbytes;

  if (fd < 0) return -1;

  while (nleft > 0) {
    nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
    if (nread == 0) {
      break;
    } else if (nread < 0) {
      if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
        continue;
      } else {
        return -1;
      }
    } else {
      nleft -= nread;
      ptr += nread;
    }

    if (errno == SIGPIPE || errno == EPIPE) {
      return -1;
    }
  }

  return (nbytes - nleft);
}

int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
  taosSetNonblocking(fd, 1);

  int32_t nleft, nwritten, nready;
  fd_set  fset;
  struct timeval tv;

  nleft = nbytes;
  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
    FD_SET(fd, &fset);
    if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
      errno = ETIMEDOUT;
S
Shengliang Guan 已提交
449
      //printf("fd %d timeout, no enough space to write", fd);
450 451 452 453 454
      break;

    } else if (nready < 0) {
      if (errno == EINTR) continue;

S
Shengliang Guan 已提交
455
      //printf("select error, %d (%s)", errno, strerror(errno));
456 457 458 459 460 461 462
      return -1;
    }

    nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
    if (nwritten <= 0) {
      if (errno == EAGAIN || errno == EINTR) continue;

S
Shengliang Guan 已提交
463
      //printf("write error, %d (%s)", errno, strerror(errno));
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
      return -1;
    }

    nleft -= nwritten;
    ptr += nwritten;
  }

  taosSetNonblocking(fd, 0);

  return (nbytes - nleft);
}

int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
  int32_t nread, nready, nleft = nbytes;

  fd_set fset;
  struct timeval tv;

  while (nleft > 0) {
    tv.tv_sec = 30;
    tv.tv_usec = 0;
    FD_ZERO(&fset);
    FD_SET(fd, &fset);
    if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
      errno = ETIMEDOUT;
S
Shengliang Guan 已提交
489
      //printf("fd %d timeout\n", fd);
490 491 492
      break;
    } else if (nready < 0) {
      if (errno == EINTR) continue;
S
Shengliang Guan 已提交
493
      //printf("select error, %d (%s)", errno, strerror(errno));
494 495 496 497 498
      return -1;
    }

    if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
      if (errno == EINTR) continue;
S
Shengliang Guan 已提交
499
      //printf("read error, %d (%s)", errno, strerror(errno));
500 501 502
      return -1;

    } else if (nread == 0) {
S
Shengliang Guan 已提交
503
      //printf("fd %d EOF", fd);
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
      break;  // EOF
    }

    nleft -= nread;
    ptr += nread;
  }

  return (nbytes - nleft);
}

SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
  struct sockaddr_in localAddr;
  SOCKET sockFd;
  int32_t bufSize = 1024000;

S
Shengliang Guan 已提交
519
  //printf("open udp socket:0x%x:%hu", ip, port);
520 521 522 523 524 525 526

  memset((char *)&localAddr, 0, sizeof(localAddr));
  localAddr.sin_family = AF_INET;
  localAddr.sin_addr.s_addr = ip;
  localAddr.sin_port = (uint16_t)htons(port);

  if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
S
Shengliang Guan 已提交
527
    //printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
528 529 530 531 532
    taosCloseSocketNoCheck(sockFd);
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
533
    //printf("failed to set the send buffer size for UDP socket\n");
534 535 536 537 538
    taosCloseSocket(sockFd);
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
539
    //printf("failed to set the receive buffer size for UDP socket\n");
540 541 542 543 544 545
    taosCloseSocket(sockFd);
    return -1;
  }

  /* bind socket to local address */
  if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
S
Shengliang Guan 已提交
546
    //printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    taosCloseSocket(sockFd);
    return -1;
  }

  return sockFd;
}

SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
  SOCKET sockFd = 0;
  int32_t ret;
  struct sockaddr_in serverAddr, clientAddr;
  int32_t bufSize = 1024 * 1024;

  sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

  if (sockFd <= 2) {
S
Shengliang Guan 已提交
563
    //printf("failed to open the socket: %d (%s)", errno, strerror(errno));
564 565 566 567 568 569 570
    taosCloseSocketNoCheck(sockFd);
    return -1;
  }

  /* set REUSEADDR option, so the portnumber can be re-used */
  int32_t reuse = 1;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
S
Shengliang Guan 已提交
571
    //printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
572 573 574 575 576
    taosCloseSocket(sockFd);
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
577
    //printf("failed to set the send buffer size for TCP socket\n");
578 579 580 581 582
    taosCloseSocket(sockFd);
    return -1;
  }

  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
S
Shengliang Guan 已提交
583
    //printf("failed to set the receive buffer size for TCP socket\n");
584 585 586 587 588 589 590 591 592 593 594 595
    taosCloseSocket(sockFd);
    return -1;
  }

  if (clientIp != 0) {
    memset((char *)&clientAddr, 0, sizeof(clientAddr));
    clientAddr.sin_family = AF_INET;
    clientAddr.sin_addr.s_addr = clientIp;
    clientAddr.sin_port = 0;

    /* bind socket to client address */
    if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
S
Shengliang Guan 已提交
596 597
      //printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
      //       strerror(errno));
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
      taosCloseSocket(sockFd);
      return -1;
    }
  }

  memset((char *)&serverAddr, 0, sizeof(serverAddr));
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.s_addr = destIp;
  serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);

#ifdef _TD_LINUX 
  taosSetNonblocking(sockFd, 1);   
  ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); 
  if (ret == -1) {
    if (errno == EHOSTUNREACH) {
S
Shengliang Guan 已提交
613
      //printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
614 615 616 617 618 619 620 621 622 623
      taosCloseSocket(sockFd);
      return -1; 
    } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
      struct pollfd wfd[1]; 

      wfd[0].fd = sockFd;
      wfd[0].events = POLLOUT;
    
      int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
      if (res == -1 || res == 0) {
S
Shengliang Guan 已提交
624
        //printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
625 626 627 628 629
        taosCloseSocket(sockFd); //  
        return -1;
      }
      int optVal = -1, optLen = sizeof(int);  
      if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
S
Shengliang Guan 已提交
630
        //printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
631 632 633 634 635
        taosCloseSocket(sockFd); //  
        return -1;
      }
      ret = 0;
    } else { // Other error
S
Shengliang Guan 已提交
636
      //printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
637 638 639 640 641 642 643 644 645 646 647
      taosCloseSocket(sockFd); //  
      return -1; 
    } 
  }
  taosSetNonblocking(sockFd, 0);   

#else
  ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); 
#endif

  if (ret != 0) {
S
Shengliang Guan 已提交
648
    //printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
649 650 651 652 653 654 655 656 657 658 659 660
    taosCloseSocket(sockFd);
    sockFd = -1;
  } else {
    taosKeepTcpAlive(sockFd);
  }

  return sockFd;
}

int32_t taosKeepTcpAlive(SOCKET sockFd) {
  int32_t alive = 1;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
S
Shengliang Guan 已提交
661
    //printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
662 663 664 665 666 667 668 669
    taosCloseSocket(sockFd);
    return -1;
  }

#ifndef __APPLE__
  // all fails on macosx
  int32_t probes = 3;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
S
Shengliang Guan 已提交
670
    //printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
671 672 673 674 675 676
    taosCloseSocket(sockFd);
    return -1;
  }

  int32_t alivetime = 10;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
S
Shengliang Guan 已提交
677
    //printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
678 679 680 681 682 683
    taosCloseSocket(sockFd);
    return -1;
  }

  int32_t interval = 3;
  if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
S
Shengliang Guan 已提交
684
    //printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
685 686 687 688 689 690 691
    taosCloseSocket(sockFd);
    return -1;
  }
#endif // __APPLE__

  int32_t nodelay = 1;
  if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
S
Shengliang Guan 已提交
692
    //printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
693 694 695 696 697 698 699 700
    taosCloseSocket(sockFd);
    return -1;
  }

  struct linger linger = {0};
  linger.l_onoff = 1;
  linger.l_linger = 3;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
S
Shengliang Guan 已提交
701
    //printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
702 703 704 705 706 707 708 709 710 711 712 713
    taosCloseSocket(sockFd);
    return -1;
  }

  return 0;
}

SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
  struct sockaddr_in serverAdd;
  SOCKET             sockFd;
  int32_t            reuse;

S
Shengliang Guan 已提交
714
  //printf("open tcp server socket:0x%x:%hu", ip, port);
715 716 717 718 719 720 721

  bzero((char *)&serverAdd, sizeof(serverAdd));
  serverAdd.sin_family = AF_INET;
  serverAdd.sin_addr.s_addr = ip;
  serverAdd.sin_port = (uint16_t)htons(port);

  if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
S
Shengliang Guan 已提交
722
    //printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
723 724 725 726 727 728 729
    taosCloseSocketNoCheck(sockFd);
    return -1;
  }

  /* set REUSEADDR option, so the portnumber can be re-used */
  reuse = 1;
  if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
S
Shengliang Guan 已提交
730
    //printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
731 732 733 734 735 736
    taosCloseSocket(sockFd);
    return -1;
  }

  /* bind socket to server address */
  if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
S
Shengliang Guan 已提交
737
    //printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
738 739 740 741 742
    taosCloseSocket(sockFd);
    return -1;
  }

  if (taosKeepTcpAlive(sockFd) < 0) {
S
Shengliang Guan 已提交
743
    //printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
744 745 746 747 748
    taosCloseSocket(sockFd);
    return -1;
  }

  if (listen(sockFd, 1024) < 0) {
S
Shengliang Guan 已提交
749
    //printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
    taosCloseSocket(sockFd);
    return -1;
  }

  return sockFd;
}

void tinet_ntoa(char *ipstr, uint32_t ip) {
  sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}

#define COPY_SIZE 32768
// sendfile shall be used

int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
  int64_t leftLen;
  int64_t readLen, writeLen;
  char    temp[COPY_SIZE];

  leftLen = len;

  while (leftLen > 0) {
    if (leftLen < COPY_SIZE)
      readLen = leftLen;
    else
      readLen = COPY_SIZE;  // 4K

    int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
    if (readLen != retLen) {
S
Shengliang Guan 已提交
779 780
      //printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //       readLen, retLen, len, leftLen, strerror(errno));
781 782 783 784 785 786
      return -1;
    }

    writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen);

    if (readLen != writeLen) {
S
Shengliang Guan 已提交
787 788
      //printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
      //       readLen, writeLen, len, leftLen, strerror(errno));
789 790 791 792 793 794 795 796
      return -1;
    }

    leftLen -= readLen;
  }

  return len;
}
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814

#endif



#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
void taosBlockSIGPIPE() {
  sigset_t signal_mask;
  sigemptyset(&signal_mask);
  sigaddset(&signal_mask, SIGPIPE);
  int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
  if (rc != 0) {
    //printf("failed to block SIGPIPE");
  }
}
#else
void taosBlockSIGPIPE() {}
#endif