diff --git a/include/os/osSocket.h b/include/os/osSocket.h index da1a9651e924388b3a144d9717a898e8ee104809..6544c895486f74570783c53e86509c57d312ae4d 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -21,7 +21,10 @@ #define socket SOCKET_FUNC_TAOS_FORBID #define bind BIND_FUNC_TAOS_FORBID #define listen LISTEN_FUNC_TAOS_FORBID - // #define accept ACCEPT_FUNC_TAOS_FORBID + #define accept ACCEPT_FUNC_TAOS_FORBID + #define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID + #define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID + #define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID #endif #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) @@ -38,31 +41,6 @@ extern "C" { #endif -#define TAOS_EPOLL_WAIT_TIME 500 -typedef int32_t SOCKET; -typedef SOCKET EpollFd; -#define EpollClose(pollFd) taosCloseSocket(pollFd) - -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) -typedef SOCKET SocketFd; -#else -typedef int32_t SocketFd; -#endif - -int32_t taosSendto(SocketFd fd, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen); -int32_t taosWriteSocket(SocketFd fd, void *msg, int len); -int32_t taosReadSocket(SocketFd fd, void *msg, int len); -int32_t taosCloseSocketNoCheck(SocketFd fd); -int32_t taosCloseSocket(SocketFd fd); -void taosShutDownSocketRD(SOCKET fd); -void taosShutDownSocketWR(SOCKET fd); -int32_t taosSetNonblocking(SOCKET sock, int32_t on); -int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); -int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen); - -uint32_t taosInetAddr(const char *ipAddr); -const char *taosInetNtoa(struct in_addr ipInt); - #if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) #define htobe64 htonll #if defined(_TD_GO_DLL_) @@ -74,25 +52,54 @@ const char *taosInetNtoa(struct in_addr ipInt); #define htobe64 htonll #endif -int32_t taosReadn(SOCKET sock, char *buffer, int32_t len); -int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes); -int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes); -int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes); -int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len); -int32_t taosSetNonblocking(SOCKET sock, int32_t on); +#define TAOS_EPOLL_WAIT_TIME 500 + +typedef struct TdSocketServer *TdSocketServerPtr; +typedef struct TdSocket *TdSocketPtr; +typedef struct TdEpoll *TdEpollPtr; + +int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen); +int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len); +int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len); +int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen); +int32_t taosCloseSocket(TdSocketPtr *ppSocket); +int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer); +int32_t taosShutDownSocketRD(TdSocketPtr pSocket); +int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer); +int32_t taosShutDownSocketWR(TdSocketPtr pSocket); +int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer); +int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket); +int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer); +int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on); +int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen); +int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen); +int32_t taosWriteMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes); +int32_t taosReadMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes); +int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes); +int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len); -SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); -SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); -SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port); -int32_t taosKeepTcpAlive(SOCKET sockFd); +TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); +TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port); +int32_t taosKeepTcpAlive(TdSocketPtr pSocket); +TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, socklen_t *addrLen); -void taosBlockSIGPIPE(); +int32_t taosGetSocketName(TdSocketPtr pSocket,struct sockaddr *destAddr, socklen_t *addrLen); + +void taosBlockSIGPIPE(); uint32_t taosGetIpv4FromFqdn(const char *); int32_t taosGetFqdn(char *); void tinet_ntoa(char *ipstr, uint32_t ip); uint32_t ip2uint(const char *const ip_addr); -void taosIgnSIGPIPE(); -void taosSetMaskSIGPIPE(); +void taosIgnSIGPIPE(); +void taosSetMaskSIGPIPE(); +uint32_t taosInetAddr(const char *ipAddr); +const char *taosInetNtoa(struct in_addr ipInt); + +TdEpollPtr taosCreateEpoll(int32_t size); +int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event); +int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout); +int32_t taosCloseEpoll(TdEpollPtr *ppEpoll); #ifdef __cplusplus } diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index d95ac3d36d5d8b233771a5953f0996cf52dd1ba3..aac38b21e8fdf570dfe76414586e6cca83961dd7 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -24,7 +24,7 @@ #ifndef USE_UV typedef struct SFdObj { void * signature; - SOCKET fd; // TCP socket FD + TdSocketPtr pSocket; // TCP socket FD void * thandle; // handle from upper layer, like TAOS uint32_t ip; uint16_t port; @@ -40,7 +40,7 @@ typedef struct SThreadObj { pthread_mutex_t mutex; uint32_t ip; bool stop; - EpollFd pollFd; + TdEpollPtr pEpoll; int numOfFds; int threadId; char label[TSDB_LABEL_LEN]; @@ -56,20 +56,20 @@ typedef struct { } SClientObj; typedef struct { - SOCKET fd; - uint32_t ip; - uint16_t port; - int8_t stop; - int8_t reserve; - char label[TSDB_LABEL_LEN]; - int numOfThreads; - void * shandle; - SThreadObj **pThreadObj; - pthread_t thread; + TdSocketServerPtr pSocketServer; + uint32_t ip; + uint16_t port; + int8_t stop; + int8_t reserve; + char label[TSDB_LABEL_LEN]; + int numOfThreads; + void * shandle; + SThreadObj **pThreadObj; + pthread_t thread; } SServerObj; static void * taosProcessTcpData(void *param); -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); static void * taosAcceptTcpConnection(void *arg); @@ -85,7 +85,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return NULL; } - pServerObj->fd = -1; + pServerObj->pSocketServer = NULL; taosResetPthread(&pServerObj->thread); pServerObj->ip = ip; pServerObj->port = port; @@ -118,7 +118,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pServerObj->pThreadObj[i] = pThreadObj; - pThreadObj->pollFd = -1; + pThreadObj->pEpoll = NULL; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); @@ -135,8 +135,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter - if (pThreadObj->pollFd < 0) { + pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter + if (pThreadObj->pEpoll == NULL) { tError("%s failed to create TCP epoll", label); code = -1; break; @@ -151,8 +151,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj->threadId = i; } - pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); - if (pServerObj->fd < 0) code = -1; + pServerObj->pSocketServer = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); + if (pServerObj->pSocketServer == NULL) code = -1; if (code == 0) { code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj); @@ -196,8 +196,8 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; pServerObj->stop = 1; - if (pServerObj->fd >= 0) { - taosShutDownSocketRD(pServerObj->fd); + if (pServerObj->pSocketServer != NULL) { + taosShutDownSocketServerRD(pServerObj->pSocketServer); } if (taosCheckPthreadValid(pServerObj->thread)) { if (taosComparePthread(pServerObj->thread, pthread_self())) { @@ -227,7 +227,7 @@ void taosCleanUpTcpServer(void *handle) { } static void *taosAcceptTcpConnection(void *arg) { - SOCKET connFd = -1; + TdSocketPtr pSocket = NULL; struct sockaddr_in caddr; int threadId = 0; SThreadObj * pThreadObj; @@ -239,13 +239,13 @@ static void *taosAcceptTcpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(caddr); - connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); + pSocket = taosAcceptTcpConnectSocket(pServerObj->pSocketServer, (struct sockaddr *)&caddr, &addrlen); if (pServerObj->stop) { tDebug("%s TCP server stop accepting new connections", pServerObj->label); break; } - if (connFd == -1) { + if (pSocket == NULL) { if (errno == EINVAL) { tDebug("%s TCP server stop accepting new connections, exiting", pServerObj->label); break; @@ -255,11 +255,11 @@ static void *taosAcceptTcpConnection(void *arg) { continue; } - taosKeepTcpAlive(connFd); + taosKeepTcpAlive(pSocket); struct timeval to = {5, 0}; - int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); + int32_t ret = taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); if (ret != 0) { - taosCloseSocket(connFd); + taosCloseSocket(&pSocket); tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); continue; @@ -268,14 +268,14 @@ static void *taosAcceptTcpConnection(void *arg) { // pick up the thread to handle this connection pThreadObj = pServerObj->pThreadObj[threadId]; - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket); if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, - taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); + tDebug("%s new TCP connection from %s:%hu, FD:%p numOfFds:%d", pServerObj->label, + taosInetNtoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); } else { - taosCloseSocket(connFd); + taosCloseSocket(&pSocket); tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); } @@ -285,7 +285,7 @@ static void *taosAcceptTcpConnection(void *arg) { threadId = threadId % pServerObj->numOfThreads; } - taosCloseSocket(pServerObj->fd); + taosCloseSocketServer(&pServerObj->pSocketServer); return NULL; } @@ -339,8 +339,8 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter - if (pThreadObj->pollFd < 0) { + pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter + if (pThreadObj->pEpoll == NULL) { tError("%s failed to create TCP epoll", label); code = -1; break; @@ -388,21 +388,17 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin atomic_store_32(&pClientObj->index, index + 1); SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; - SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - if (fd == (SOCKET)-1) return NULL; -#else - if (fd <= 0) return NULL; -#endif + TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); + if (pSocket == NULL) return NULL; struct sockaddr_in sin; uint16_t localPort = 0; unsigned int addrlen = sizeof(sin); - if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { + if (taosGetSocketName(pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { localPort = (uint16_t)ntohs(sin.sin_port); } - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket); if (pFdObj) { pFdObj->thandle = thandle; @@ -415,7 +411,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ipport, localPort, pFdObj, pThreadObj->numOfFds); } else { tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); - taosCloseSocket(fd); + taosCloseSocket(&pSocket); } return pFdObj; @@ -430,7 +426,7 @@ void taosCloseTcpConnection(void *chandle) { // pFdObj->thandle = NULL; pFdObj->closedByApp = 1; - taosShutDownSocketWR(pFdObj->fd); + taosShutDownSocketWR(pFdObj->pSocket); } int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { @@ -438,8 +434,8 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; SThreadObj *pThreadObj = pFdObj->pThreadObj; - int ret = taosWriteMsg(pFdObj->fd, data, len); - tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); + int ret = taosWriteMsg(pFdObj->pSocket, data, len); + tTrace("%s %p TCP data is sent, FD:%p bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, ret); return ret; } @@ -449,7 +445,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { // notify the upper layer, so it will clean the associated context if (pFdObj->closedByApp == 0) { - taosShutDownSocketWR(pFdObj->fd); + taosShutDownSocketWR(pFdObj->pSocket); SRecvInfo recvInfo; recvInfo.msg = NULL; @@ -473,7 +469,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { SThreadObj *pThreadObj = pFdObj->pThreadObj; - headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); + headLen = taosReadMsg(pFdObj->pSocket, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen); return -1; @@ -486,13 +482,12 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, - buffer); + tTrace("%s %p read data, FD:%p TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, buffer); } msg = buffer + tsRpcOverhead; leftLen = msgLen - headLen; - retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); + retLen = taosReadMsg(pFdObj->pSocket, msg + headLen, leftLen); if (leftLen != retLen) { tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); @@ -532,7 +527,7 @@ static void *taosProcessTcpData(void *param) { setThreadName(name); while (1) { - int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); + int fdNum = taosWaitEpoll(pThreadObj->pEpoll, events, maxEvents, TAOS_EPOLL_WAIT_TIME); if (pThreadObj->stop) { tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label); break; @@ -561,7 +556,7 @@ static void *taosProcessTcpData(void *param) { } if (taosReadTcpData(pFdObj, &recvInfo) < 0) { - shutdown(pFdObj->fd, SHUT_WR); + taosShutDownSocketWR(pFdObj->pSocket); continue; } @@ -572,9 +567,9 @@ static void *taosProcessTcpData(void *param) { if (pThreadObj->stop) break; } - if (pThreadObj->pollFd >= 0) { - EpollClose(pThreadObj->pollFd); - pThreadObj->pollFd = -1; + if (pThreadObj->pEpoll != NULL) { + taosCloseEpoll(&pThreadObj->pEpoll); + pThreadObj->pEpoll = NULL; } while (pThreadObj->pHead) { @@ -590,7 +585,7 @@ static void *taosProcessTcpData(void *param) { return NULL; } -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket) { struct epoll_event event; SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); @@ -599,13 +594,13 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { } pFdObj->closedByApp = 0; - pFdObj->fd = fd; + pFdObj->pSocket = pSocket; pFdObj->pThreadObj = pThreadObj; pFdObj->signature = pFdObj; event.events = EPOLLIN | EPOLLRDHUP; event.data.ptr = pFdObj; - if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + if (taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_ADD, pSocket, &event) < 0) { tfree(pFdObj); terrno = TAOS_SYSTEM_ERROR(errno); return NULL; @@ -635,8 +630,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) { } pFdObj->signature = NULL; - epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - taosCloseSocket(pFdObj->fd); + taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_DEL, pFdObj->pSocket, NULL); + taosCloseSocket(&pFdObj->pSocket); pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) @@ -655,8 +650,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pthread_mutex_unlock(&pThreadObj->mutex); - tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, - pFdObj->fd, pThreadObj->numOfFds); + tDebug("%s %p TCP connection is closed, FD:%p numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); tfree(pFdObj); } diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c index 3640414a4c4ec8c75cebd9175d5ec477ac1ea644..81c8d0af7616598c886ce9dad9fb5be16c83c117 100644 --- a/source/libs/transport/src/rpcUdp.c +++ b/source/libs/transport/src/rpcUdp.c @@ -30,17 +30,17 @@ #define RPC_MAX_UDP_SIZE 65480 typedef struct { - int index; - SOCKET fd; - uint16_t port; // peer port - uint16_t localPort; // local port - char label[TSDB_LABEL_LEN]; // copy from udpConnSet; - pthread_t thread; - void * hash; - void * shandle; // handle passed by upper layer during server initialization - void * pSet; - void *(*processData)(SRecvInfo *pRecv); - char *buffer; // buffer to receive data + int index; + TdSocketPtr pSocket; + uint16_t port; // peer port + uint16_t localPort; // local port + char label[TSDB_LABEL_LEN]; // copy from udpConnSet; + pthread_t thread; + void *hash; + void *shandle; // handle passed by upper layer during server initialization + void *pSet; + void *(*processData)(SRecvInfo *pRecv); + char *buffer; // buffer to receive data } SUdpConn; typedef struct { @@ -86,8 +86,8 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads for (i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; ownPort = (port ? port + i : 0); - pConn->fd = taosOpenUdpSocket(ip, ownPort); - if (pConn->fd < 0) { + pConn->pSocket = taosOpenUdpSocket(ip, ownPort); + if (pConn->pSocket == NULL) { tError("%s failed to open UDP socket %x:%hu", label, ip, port); break; } @@ -100,7 +100,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads struct sockaddr_in sin; unsigned int addrlen = sizeof(sin); - if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && + if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { pConn->localPort = (uint16_t)ntohs(sin.sin_port); } @@ -138,9 +138,9 @@ void taosStopUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR); - if (pConn->fd >= 0) taosCloseSocket(pConn->fd); - pConn->fd = -1; + if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket); + if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket); + pConn->pSocket = NULL; } for (int i = 0; i < pSet->threads; ++i) { @@ -163,7 +163,7 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - if (pConn->fd >= 0) taosCloseSocket(pConn->fd); + if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket); } tDebug("%s UDP is cleaned up", pSet->label); @@ -199,13 +199,12 @@ static void *taosRecvUdpData(void *param) { setThreadName("recvUdpData"); while (1) { - dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); + dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); if (dataLen <= 0) { - tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d fd:%d", pConn->label, strerror(errno), (int32_t)dataLen, - pConn->fd); + tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen); // for windows usage, remote shutdown also returns - 1 in windows client - if (pConn->fd == -1) { + if (pConn->pSocket == NULL) { break; } else { continue; @@ -255,7 +254,7 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c destAdd.sin_addr.s_addr = ip; destAdd.sin_port = htons(port); - int ret = (int)taosSendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); + int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); return ret; } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 95e67290acb085c5412d270aea8562ac26463749..6d1c691b9ba17ac001bfcdf35212cc225f11b0d6 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -173,7 +173,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 #else int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t code = -1; - SOCKET fd = 0; + TdSocketPtr pSocket = NULL; uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { @@ -182,8 +182,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 goto SEND_OVER; } - fd = taosOpenTcpClientSocket(ip, port, 0); - if (fd < 0) { + pSocket = taosOpenTcpClientSocket(ip, port, 0); + if (pSocket == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; @@ -200,21 +200,20 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - - if (taosWriteSocket(fd, header, headLen) < 0) { + if (taosWriteMsg(pSocket, header, headLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to send http header to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) { + if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to send http content to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } // read something to avoid nginx error 499 - if (taosReadSocket(fd, header, 10) < 0) { + if (taosWriteMsg(pSocket, header, 10) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; @@ -223,8 +222,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 code = 0; SEND_OVER: - if (fd != 0) { - taosCloseSocket(fd); + if (pSocket != NULL) { + taosCloseSocket(&pSocket); } return code; diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 096516a98daa8137a1427565cbabfdbf9545c118..9330cb7b32c1b3404df1d9f450dba9a9ccc430b6 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -18,176 +18,206 @@ #include "os.h" #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - #include "winsock2.h" - #include - #include - #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "winsock2.h" #else - #include - #include - #include - #include - #include - #include - #include - #include - #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #endif -// typedef struct TdSocketServer { -// #if SOCKET_WITH_LOCK -// pthread_rwlock_t rwlock; -// #endif -// int refId; -// SocketFd fd; -// } * TdSocketServerPtr, TdSocketServer; - -// typedef struct TdSocketConnector { -// #if SOCKET_WITH_LOCK -// pthread_rwlock_t rwlock; -// #endif -// int refId; -// SocketFd fd; -// } * TdSocketConnectorPtr, TdSocketConnector; - -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +typedef int32_t SocketFd; +typedef SocketFd EpollFd; -#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags) -int32_t taosSendto(SocketFd fd, void *buf, int len, unsigned int flags, const struct sockaddr *to, int tolen) { - return sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen); -} -int32_t taosWriteSocket(SocketFd fd, void *buf, int len) { return send((SOCKET)fd, buf, len, 0); } -int32_t taosReadSocket(SocketFd fd, void *buf, int len) { return recv((SOCKET)fd, buf, len, 0)(); } -int32_t taosCloseSocketNoCheck(SocketFd fd) { return closesocket((SOCKET)fd); } -int32_t taosCloseSocket(SocketFd fd) { closesocket((SOCKET)fd) } +typedef struct TdSocketServer { +#if SOCKET_WITH_LOCK + pthread_rwlock_t rwlock; +#endif + int refId; + SocketFd fd; +} *TdSocketServerPtr, TdSocketServer; -#else +typedef struct TdSocket { +#if SOCKET_WITH_LOCK + pthread_rwlock_t rwlock; +#endif + int refId; + SocketFd fd; +} *TdSocketPtr, TdSocket; - #define taosSend(sockfd, buf, len, flags) send(sockfd, buf, len, flags) - int32_t taosSendto(SocketFd fd, void * buf, int len, unsigned int flags, const struct sockaddr * dest_addr, int addrlen) { - return sendto(fd, buf, len, flags, dest_addr, addrlen); - } +typedef struct TdEpoll { +#if SOCKET_WITH_LOCK + pthread_rwlock_t rwlock; +#endif + int refId; + EpollFd fd; +} *TdEpollPtr, TdEpoll; - int32_t taosWriteSocket(SocketFd fd, void *buf, int len) { - return write(fd, buf, len); +int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr, + int addrlen) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } - - int32_t taosReadSocket(SocketFd fd, void *buf, int len) { - return read(fd, buf, len); +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen); +#else + return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen); +#endif +} +int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } - - int32_t taosCloseSocketNoCheck(SocketFd fd) { - return close(fd); +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return send(pSocket->fd, buf, len, 0); + ; +#else + return write(pSocket->fd, buf, len); +#endif +} +int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return recv(pSocket->fd, buf, len, 0); + ; +#else + return read(pSocket->fd, buf, len); +#endif +} - int32_t taosCloseSocket(SocketFd fd) { - if (fd > -1) { - close(fd); - } +int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } + return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen); +} +int32_t taosCloseSocketNoCheck1(SocketFd fd) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return closesocket(fd); +#else + return close(fd); #endif +} +int32_t taosCloseSocket(TdSocketPtr *ppSocket) { + int32_t code; + if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) { + return -1; + } + code = taosCloseSocketNoCheck1((*ppSocket)->fd); + (*ppSocket)->fd = -1; + free(*ppSocket); + return code; +} +int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) { + int32_t code; + if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) { + return -1; + } + code = taosCloseSocketNoCheck1((*ppSocketServer)->fd); + (*ppSocketServer)->fd = -1; + free(*ppSocketServer); + return code; +} -void taosShutDownSocketRD(SOCKET fd) { +int32_t taosShutDownSocketRD(TdSocketPtr pSocket) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } #ifdef WINDOWS - closesocket(fd); + return closesocket(pSocket->fd); #elif __APPLE__ - close(fd); + return close(pSocket->fd); #else - shutdown(fd, SHUT_RD); + return shutdown(pSocket->fd, SHUT_RD); #endif } - -void taosShutDownSocketWR(SOCKET fd) { +int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) { + if (pSocketServer == NULL || pSocketServer->fd < 0) { + return -1; + } #ifdef WINDOWS - closesocket(fd); + return closesocket(pSocketServer->fd); #elif __APPLE__ - close(fd); + return close(pSocketServer->fd); #else - shutdown(fd, SHUT_WR); + return shutdown(pSocketServer->fd, SHUT_RD); #endif } -#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) - -int32_t taosSetNonblocking(SOCKET sock, int32_t on) { - int32_t flags = 0; - if ((flags = fcntl(sock, F_GETFL, 0)) < 0) { - //printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno)); - return 1; - } - - if (on) - flags |= O_NONBLOCK; - else - flags &= ~O_NONBLOCK; - - if ((flags = fcntl(sock, F_SETFL, flags)) < 0) { - //printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno)); - return 1; +int32_t taosShutDownSocketWR(TdSocketPtr pSocket) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } - - return 0; -} - - - - +#ifdef WINDOWS + return closesocket(pSocket->fd); +#elif __APPLE__ + return close(pSocket->fd); +#else + return shutdown(pSocket->fd, SHUT_WR); #endif - -#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_32)) - -int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { - return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); -} - -int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen) { - return getsockopt(socketfd, level, optname, optval, (socklen_t *)optlen); } - -#endif - -#if !((defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) && defined(_MSC_VER)) - -uint32_t taosInetAddr(const char *ipAddr) { return inet_addr(ipAddr); } - -const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); } - +int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) { + if (pSocketServer == NULL || pSocketServer->fd < 0) { + return -1; + } +#ifdef WINDOWS + return closesocket(pSocketServer->fd); +#elif __APPLE__ + return close(pSocketServer->fd); +#else + return shutdown(pSocketServer->fd, SHUT_WR); #endif - -#if defined(_TD_DARWIN_64) - -/* - * darwin implementation - */ - -int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) { - if (level == SOL_SOCKET && optname == SO_SNDBUF) { - return 0; +} +int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; } - - if (level == SOL_SOCKET && optname == SO_RCVBUF) { - return 0; +#ifdef WINDOWS + return closesocket(pSocket->fd); +#elif __APPLE__ + return close(pSocket->fd); +#else + return shutdown(pSocket->fd, SHUT_RDWR); +#endif +} +int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) { + if (pSocketServer == NULL || pSocketServer->fd < 0) { + return -1; } - - return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); +#ifdef WINDOWS + return closesocket(pSocketServer->fd); +#elif __APPLE__ + return close(pSocketServer->fd); +#else + return shutdown(pSocketServer->fd, SHUT_RDWR); +#endif } + +#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) + #if defined(_TD_GO_DLL_) + uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); } + #endif #endif +void taosWinSocketInit1() { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - -/* - * windows implementation - */ - -#include -#include -#include -#include -#include -#include -#include - -void taosWinSocketInit() { static char flag = 0; if (flag == 0) { WORD wVersionRequested; @@ -197,21 +227,46 @@ void taosWinSocketInit() { flag = 1; } } +#else +#endif } - -int32_t taosSetNonblocking(SOCKET sock, int32_t on) { +int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) u_long mode; if (on) { mode = 1; - ioctlsocket(sock, FIONBIO, &mode); + ioctlsocket(pSocket->fd, FIONBIO, &mode); } else { mode = 0; - ioctlsocket(sock, FIONBIO, &mode); + ioctlsocket(pSocket->fd, FIONBIO, &mode); + } +#else + int32_t flags = 0; + if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) { + // printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno)); + return 1; } + + if (on) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + + if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) { + // printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno)); + return 1; + } +#endif return 0; } - -int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { +int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { return 0; } @@ -228,13 +283,23 @@ int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *op return 0; } - return setsockopt(socketfd, level, optname, optval, optlen); + return setsockopt(pSocket->fd, level, optname, optval, optlen); +#else + return setsockopt(pSocket->fd, level, optname, optval, (socklen_t)optlen); +#endif +} +int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return getsockopt(pSocket->fd, level, optname, optval, (socklen_t *)optlen); +#endif } - -#ifdef _MSC_VER -//#if _MSC_VER >= 1900 - uint32_t taosInetAddr(const char *ipAddr) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) uint32_t value; int32_t ret = inet_pton(AF_INET, ipAddr, &value); if (ret <= 0) { @@ -242,39 +307,37 @@ uint32_t taosInetAddr(const char *ipAddr) { } else { return value; } +#else + return inet_addr(ipAddr); +#endif } - const char *taosInetNtoa(struct in_addr ipInt) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) // not thread safe, only for debug usage while print log static char tmpDstStr[16]; return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN); -} - -//#endif -#endif - -#if defined(_TD_GO_DLL_) - -uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); } - -#endif - +#else + return inet_ntoa(ipInt); #endif +} #ifndef SIGPIPE - #define SIGPIPE EPIPE +#define SIGPIPE EPIPE #endif #define TCP_CONN_TIMEOUT 3000 // conn timeout -int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { +int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } int32_t nleft, nwritten; - char * ptr = (char *)buf; + char *ptr = (char *)buf; nleft = nbytes; while (nleft > 0) { - nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); + nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft); if (nwritten <= 0) { if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) continue; @@ -293,20 +356,21 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { +int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } int32_t nleft, nread; - char * ptr = (char *)buf; + char *ptr = (char *)buf; nleft = nbytes; - if (fd < 0) return -1; - while (nleft > 0) { - nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft); + nread = taosReadSocket(pSocket, ptr, (size_t)nleft); if (nread == 0) { break; } else if (nread < 0) { - if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) { + if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) { continue; } else { return -1; @@ -324,11 +388,14 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { - taosSetNonblocking(fd, 1); +int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } + taosSetNonblocking(pSocket, 1); - int32_t nleft, nwritten, nready; - fd_set fset; + int32_t nleft, nwritten, nready; + fd_set fset; struct timeval tv; nleft = nbytes; @@ -336,24 +403,24 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { tv.tv_sec = 30; tv.tv_usec = 0; FD_ZERO(&fset); - FD_SET(fd, &fset); - if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { + FD_SET(pSocket->fd, &fset); + if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) { errno = ETIMEDOUT; - //printf("fd %d timeout, no enough space to write", fd); + // printf("fd %d timeout, no enough space to write", fd); break; } else if (nready < 0) { if (errno == EINTR) continue; - //printf("select error, %d (%s)", errno, strerror(errno)); + // printf("select error, %d (%s)", errno, strerror(errno)); return -1; } - nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL); + nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL); if (nwritten <= 0) { if (errno == EAGAIN || errno == EINTR) continue; - //printf("write error, %d (%s)", errno, strerror(errno)); + // printf("write error, %d (%s)", errno, strerror(errno)); return -1; } @@ -361,121 +428,99 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { ptr += nwritten; } - taosSetNonblocking(fd, 0); + taosSetNonblocking(pSocket, 0); return (nbytes - nleft); } -int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) { - int32_t nread, nready, nleft = nbytes; - - fd_set fset; - struct timeval tv; - - while (nleft > 0) { - tv.tv_sec = 30; - tv.tv_usec = 0; - FD_ZERO(&fset); - FD_SET(fd, &fset); - if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { - errno = ETIMEDOUT; - //printf("fd %d timeout\n", fd); - break; - } else if (nready < 0) { - if (errno == EINTR) continue; - //printf("select error, %d (%s)", errno, strerror(errno)); - return -1; - } - - if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) { - if (errno == EINTR) continue; - //printf("read error, %d (%s)", errno, strerror(errno)); - return -1; - - } else if (nread == 0) { - //printf("fd %d EOF", fd); - break; // EOF - } - - nleft -= nread; - ptr += nread; - } - - return (nbytes - nleft); -} - -SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { +TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) { struct sockaddr_in localAddr; - SOCKET sockFd; - int32_t bufSize = 1024000; + SocketFd fd; + int32_t bufSize = 1024000; - //printf("open udp socket:0x%x:%hu", ip, port); + // printf("open udp socket:0x%x:%hu", ip, port); memset((char *)&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; localAddr.sin_addr.s_addr = ip; localAddr.sin_port = (uint16_t)htons(port); - if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { - //printf("failed to open udp socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; + if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { + // printf("failed to open udp socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck1(fd); + return NULL; } - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - //printf("failed to set the send buffer size for UDP socket\n"); - taosCloseSocket(sockFd); - return -1; + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return NULL; } + pSocket->fd = fd; + pSocket->refId = 0; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - //printf("failed to set the receive buffer size for UDP socket\n"); - taosCloseSocket(sockFd); - return -1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + // printf("failed to set the send buffer size for UDP socket\n"); + taosCloseSocket(&pSocket); + return NULL; + } + + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + // printf("failed to set the receive buffer size for UDP socket\n"); + taosCloseSocket(&pSocket); + return NULL; } /* bind socket to local address */ - if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { - //printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port); - taosCloseSocket(sockFd); - return -1; + if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { + // printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port); + taosCloseSocket(&pSocket); + return NULL; } - return sockFd; + return pSocket; } -SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { - SOCKET sockFd = 0; - int32_t ret; +TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { + SocketFd fd = -1; + int32_t ret; struct sockaddr_in serverAddr, clientAddr; - int32_t bufSize = 1024 * 1024; + int32_t bufSize = 1024 * 1024; - sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sockFd <= 2) { - //printf("failed to open the socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; + if (fd <= 2) { + // printf("failed to open the socket: %d (%s)", errno, strerror(errno)); + if (fd >= 0) taosCloseSocketNoCheck1(fd); + return NULL; } + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return NULL; + } + pSocket->fd = fd; + pSocket->refId = 0; + /* set REUSEADDR option, so the portnumber can be re-used */ int32_t reuse = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { - //printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - //printf("failed to set the send buffer size for TCP socket\n"); - taosCloseSocket(sockFd); - return -1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + // printf("failed to set the send buffer size for TCP socket\n"); + taosCloseSocket(&pSocket); + return NULL; } - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - //printf("failed to set the receive buffer size for TCP socket\n"); - taosCloseSocket(sockFd); - return -1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + // printf("failed to set the receive buffer size for TCP socket\n"); + taosCloseSocket(&pSocket); + return NULL; } if (clientIp != 0) { @@ -485,11 +530,11 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie clientAddr.sin_port = 0; /* bind socket to client address */ - if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { - //printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort, - // strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { + // printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort, + // strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } } @@ -498,159 +543,193 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie serverAddr.sin_addr.s_addr = destIp; serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort); -#ifdef _TD_LINUX - taosSetNonblocking(sockFd, 1); - ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); +#ifdef _TD_LINUX + taosSetNonblocking(pSocket, 1); + ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); if (ret == -1) { if (errno == EHOSTUNREACH) { - //printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); + taosCloseSocket(&pSocket); + return -1; } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) { - struct pollfd wfd[1]; + struct pollfd wfd[1]; - wfd[0].fd = sockFd; + wfd[0].fd = pSocket->fd; wfd[0].events = POLLOUT; - + int res = poll(wfd, 1, TCP_CONN_TIMEOUT); if (res == -1 || res == 0) { - //printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort); - taosCloseSocket(sockFd); // + // printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort); + taosCloseSocket(&pSocket); // return -1; } - int optVal = -1, optLen = sizeof(int); - if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) { - //printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort); - taosCloseSocket(sockFd); // + int optVal = -1, optLen = sizeof(int); + if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) { + // printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort); + taosCloseSocket(&pSocket); // return -1; } ret = 0; - } else { // Other error - //printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort); - taosCloseSocket(sockFd); // - return -1; - } + } else { // Other error + // printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort); + taosCloseSocket(&pSocket); // + return -1; + } } - taosSetNonblocking(sockFd, 0); + taosSetNonblocking(pSocket, 0); #else - ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); + ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); #endif if (ret != 0) { - //printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); - taosCloseSocket(sockFd); - sockFd = -1; + // printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } else { - taosKeepTcpAlive(sockFd); + taosKeepTcpAlive(pSocket); } - return sockFd; + return pSocket; } -int32_t taosKeepTcpAlive(SOCKET sockFd) { +int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } int32_t alive = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { - //printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { + // printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } #ifndef __APPLE__ // all fails on macosx int32_t probes = 3; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { - //printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { + // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } int32_t alivetime = 10; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { - //printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { + // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } int32_t interval = 3; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { - //printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { + // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } -#endif // __APPLE__ +#endif // __APPLE__ int32_t nodelay = 1; - if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { - //printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { + // printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } struct linger linger = {0}; linger.l_onoff = 1; linger.l_linger = 3; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { - //printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { + // printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } return 0; } -SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { +TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; - SOCKET sockFd; + SocketFd fd; int32_t reuse; - //printf("open tcp server socket:0x%x:%hu", ip, port); + // printf("open tcp server socket:0x%x:%hu", ip, port); bzero((char *)&serverAdd, sizeof(serverAdd)); serverAdd.sin_family = AF_INET; serverAdd.sin_addr.s_addr = ip; serverAdd.sin_port = (uint16_t)htons(port); - if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { - //printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; + if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { + // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck1(fd); + return NULL; + } + + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return NULL; } + pSocket->refId = 0; + pSocket->fd = fd; /* set REUSEADDR option, so the portnumber can be re-used */ reuse = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { - //printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } /* bind socket to server address */ - if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { - //printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { + // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } - if (taosKeepTcpAlive(sockFd) < 0) { - //printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (taosKeepTcpAlive(pSocket) < 0) { + // printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } - if (listen(sockFd, 1024) < 0) { - //printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; + if (listen(pSocket->fd, 1024) < 0) { + // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; } - return sockFd; + return (TdSocketServerPtr)pSocket; } +TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, + socklen_t *addrLen) { + if (pServerSocket == NULL || pServerSocket->fd < 0) { + return NULL; + } + SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen); + if (fd == -1) { + // tError("TCP accept failure(%s)", strerror(errno)); + return NULL; + } + + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return NULL; + } + pSocket->fd = fd; + pSocket->refId = 0; + return pSocket; +} #define COPY_SIZE 32768 // sendfile shall be used -int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { +int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) { + if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) { + return -1; + } int64_t leftLen; int64_t readLen, writeLen; char temp[COPY_SIZE]; @@ -663,18 +742,18 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { else readLen = COPY_SIZE; // 4K - int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen); + int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen); if (readLen != retLen) { - //printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", - // readLen, retLen, len, leftLen, strerror(errno)); + // printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", + // readLen, retLen, len, leftLen, strerror(errno)); return -1; } - writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen); + writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen); if (readLen != writeLen) { - //printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", - // readLen, writeLen, len, leftLen, strerror(errno)); + // printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", + // readLen, writeLen, len, leftLen, strerror(errno)); return -1; } @@ -692,7 +771,7 @@ void taosBlockSIGPIPE() { sigaddset(&signal_mask, SIGPIPE); int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); if (rc != 0) { - //printf("failed to block SIGPIPE"); + // printf("failed to block SIGPIPE"); } #endif } @@ -706,7 +785,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); if (result) { - struct sockaddr * sa = result->ai_addr; + struct sockaddr *sa = result->ai_addr; struct sockaddr_in *si = (struct sockaddr_in *)sa; struct in_addr ia = si->sin_addr; uint32_t ip = ia.s_addr; @@ -715,12 +794,12 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) { } else { #ifdef EAI_SYSTEM if (ret == EAI_SYSTEM) { - //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno)); + // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno)); } else { - //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); + // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); } #else - //printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); + // printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); #endif return 0xFFFFFFFF; } @@ -730,7 +809,7 @@ int32_t taosGetFqdn(char *fqdn) { char hostname[1024]; hostname[1023] = '\0'; if (gethostname(hostname, 1023) == -1) { - //printf("failed to get hostname, reason:%s", strerror(errno)); + // printf("failed to get hostname, reason:%s", strerror(errno)); return -1; } @@ -742,21 +821,21 @@ int32_t taosGetFqdn(char *fqdn) { // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return // immediately hints.ai_family = AF_INET; -#else // __APPLE__ +#else // __APPLE__ hints.ai_flags = AI_CANONNAME; -#endif // __APPLE__ +#endif // __APPLE__ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - //printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + // printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); return -1; } #ifdef __APPLE__ // refer to comments above strcpy(fqdn, hostname); -#else // __APPLE__ +#else // __APPLE__ strcpy(fqdn, result->ai_canonname); -#endif // __APPLE__ +#endif // __APPLE__ freeaddrinfo(result); return 0; } @@ -793,7 +872,6 @@ void tinet_ntoa(char *ipstr, uint32_t ip) { sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); } - void taosIgnSIGPIPE() { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #else @@ -809,7 +887,67 @@ void taosSetMaskSIGPIPE() { sigaddset(&signal_mask, SIGPIPE); int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL); if (rc != 0) { - //printf("failed to setmask SIGPIPE"); + // printf("failed to setmask SIGPIPE"); + } +#endif +} + +int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, socklen_t *addrLen) { + if (pSocket == NULL || pSocket->fd < 0) { + return -1; + } + return getsockname(pSocket->fd, destAddr, addrLen); +} + + +TdEpollPtr taosCreateEpoll(int32_t size) { + EpollFd fd = -1; +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#else + fd = epoll_create(size); +#endif + if (fd < 0) { + return NULL; + } + + TdEpollPtr pEpoll = (TdEpollPtr)malloc(sizeof(TdEpoll)); + if (pEpoll == NULL) { + taosCloseSocketNoCheck1(fd); + return NULL; + } + pEpoll->fd = fd; + pEpoll->refId = 0; + return pEpoll; +} +int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) { + int32_t code = -1; + if (pEpoll == NULL || pEpoll->fd < 0) { + return -1; + } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#else + code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event); +#endif + return code; +} +int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) { + int32_t code = -1; + if (pEpoll == NULL || pEpoll->fd < 0) { + return -1; } +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#else + code = epoll_wait(pEpoll->fd, event, maxEvents, timeout); #endif -} \ No newline at end of file + return code; +} +int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) { + int32_t code; + if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) { + return -1; + } + code = taosCloseSocketNoCheck1((*ppEpoll)->fd); + (*ppEpoll)->fd = -1; + free(*ppEpoll); + return code; +}