提交 43e4cdc7 编写于 作者: wafwerar's avatar wafwerar

[TD-13760]<fix>: redefine socket api.

上级 0e11e657
......@@ -21,7 +21,10 @@
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
#define accept ACCEPT_FUNC_TAOS_FORBID
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
......@@ -38,31 +41,6 @@
extern "C" {
#endif
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
typedef SOCKET SocketFd;
#else
typedef int32_t SocketFd;
#endif
int32_t taosSendto(SocketFd fd, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen);
int32_t taosWriteSocket(SocketFd fd, void *msg, int len);
int32_t taosReadSocket(SocketFd fd, void *msg, int len);
int32_t taosCloseSocketNoCheck(SocketFd fd);
int32_t taosCloseSocket(SocketFd fd);
void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
#define htobe64 htonll
#if defined(_TD_GO_DLL_)
......@@ -74,17 +52,39 @@ const char *taosInetNtoa(struct in_addr ipInt);
#define htobe64 htonll
#endif
int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
#define TAOS_EPOLL_WAIT_TIME 500
typedef struct TdSocketServer *TdSocketServerPtr;
typedef struct TdSocket *TdSocketPtr;
typedef struct TdEpoll *TdEpollPtr;
int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen);
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen);
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
int32_t taosShutDownSocketRD(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer);
int32_t taosShutDownSocketWR(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer);
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer);
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on);
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen);
int32_t taosWriteMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
int32_t taosReadMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(TdSocketPtr pSocket);
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, socklen_t *addrLen);
int32_t taosGetSocketName(TdSocketPtr pSocket,struct sockaddr *destAddr, socklen_t *addrLen);
void taosBlockSIGPIPE();
uint32_t taosGetIpv4FromFqdn(const char *);
......@@ -93,6 +93,13 @@ void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
void taosIgnSIGPIPE();
void taosSetMaskSIGPIPE();
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
TdEpollPtr taosCreateEpoll(int32_t size);
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event);
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout);
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll);
#ifdef __cplusplus
}
......
......@@ -24,7 +24,7 @@
#ifndef USE_UV
typedef struct SFdObj {
void * signature;
SOCKET fd; // TCP socket FD
TdSocketPtr pSocket; // TCP socket FD
void * thandle; // handle from upper layer, like TAOS
uint32_t ip;
uint16_t port;
......@@ -40,7 +40,7 @@ typedef struct SThreadObj {
pthread_mutex_t mutex;
uint32_t ip;
bool stop;
EpollFd pollFd;
TdEpollPtr pEpoll;
int numOfFds;
int threadId;
char label[TSDB_LABEL_LEN];
......@@ -56,7 +56,7 @@ typedef struct {
} SClientObj;
typedef struct {
SOCKET fd;
TdSocketServerPtr pSocketServer;
uint32_t ip;
uint16_t port;
int8_t stop;
......@@ -69,7 +69,7 @@ typedef struct {
} SServerObj;
static void * taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void * taosAcceptTcpConnection(void *arg);
......@@ -85,7 +85,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL;
}
pServerObj->fd = -1;
pServerObj->pSocketServer = NULL;
taosResetPthread(&pServerObj->thread);
pServerObj->ip = ip;
pServerObj->port = port;
......@@ -118,7 +118,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1;
pThreadObj->pEpoll = NULL;
taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
......@@ -135,8 +135,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter
if (pThreadObj->pEpoll == NULL) {
tError("%s failed to create TCP epoll", label);
code = -1;
break;
......@@ -151,8 +151,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj->threadId = i;
}
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) code = -1;
pServerObj->pSocketServer = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->pSocketServer == NULL) code = -1;
if (code == 0) {
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
......@@ -196,8 +196,8 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return;
pServerObj->stop = 1;
if (pServerObj->fd >= 0) {
taosShutDownSocketRD(pServerObj->fd);
if (pServerObj->pSocketServer != NULL) {
taosShutDownSocketServerRD(pServerObj->pSocketServer);
}
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
......@@ -227,7 +227,7 @@ void taosCleanUpTcpServer(void *handle) {
}
static void *taosAcceptTcpConnection(void *arg) {
SOCKET connFd = -1;
TdSocketPtr pSocket = NULL;
struct sockaddr_in caddr;
int threadId = 0;
SThreadObj * pThreadObj;
......@@ -239,13 +239,13 @@ static void *taosAcceptTcpConnection(void *arg) {
while (1) {
socklen_t addrlen = sizeof(caddr);
connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
pSocket = taosAcceptTcpConnectSocket(pServerObj->pSocketServer, (struct sockaddr *)&caddr, &addrlen);
if (pServerObj->stop) {
tDebug("%s TCP server stop accepting new connections", pServerObj->label);
break;
}
if (connFd == -1) {
if (pSocket == NULL) {
if (errno == EINVAL) {
tDebug("%s TCP server stop accepting new connections, exiting", pServerObj->label);
break;
......@@ -255,11 +255,11 @@ static void *taosAcceptTcpConnection(void *arg) {
continue;
}
taosKeepTcpAlive(connFd);
taosKeepTcpAlive(pSocket);
struct timeval to = {5, 0};
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
int32_t ret = taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) {
taosCloseSocket(connFd);
taosCloseSocket(&pSocket);
tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
continue;
......@@ -268,14 +268,14 @@ static void *taosAcceptTcpConnection(void *arg) {
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket);
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
tDebug("%s new TCP connection from %s:%hu, FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
} else {
taosCloseSocket(connFd);
taosCloseSocket(&pSocket);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
}
......@@ -285,7 +285,7 @@ static void *taosAcceptTcpConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads;
}
taosCloseSocket(pServerObj->fd);
taosCloseSocketServer(&pServerObj->pSocketServer);
return NULL;
}
......@@ -339,8 +339,8 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter
if (pThreadObj->pEpoll == NULL) {
tError("%s failed to create TCP epoll", label);
code = -1;
break;
......@@ -388,21 +388,17 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
atomic_store_32(&pClientObj->index, index + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
if (fd == (SOCKET)-1) return NULL;
#else
if (fd <= 0) return NULL;
#endif
TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (pSocket == NULL) return NULL;
struct sockaddr_in sin;
uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
if (taosGetSocketName(pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port);
}
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket);
if (pFdObj) {
pFdObj->thandle = thandle;
......@@ -415,7 +411,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
ipport, localPort, pFdObj, pThreadObj->numOfFds);
} else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd);
taosCloseSocket(&pSocket);
}
return pFdObj;
......@@ -430,7 +426,7 @@ void taosCloseTcpConnection(void *chandle) {
// pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
taosShutDownSocketWR(pFdObj->fd);
taosShutDownSocketWR(pFdObj->pSocket);
}
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
......@@ -438,8 +434,8 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
int ret = taosWriteMsg(pFdObj->fd, data, len);
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
int ret = taosWriteMsg(pFdObj->pSocket, data, len);
tTrace("%s %p TCP data is sent, FD:%p bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, ret);
return ret;
}
......@@ -449,7 +445,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context
if (pFdObj->closedByApp == 0) {
taosShutDownSocketWR(pFdObj->fd);
taosShutDownSocketWR(pFdObj->pSocket);
SRecvInfo recvInfo;
recvInfo.msg = NULL;
......@@ -473,7 +469,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SThreadObj *pThreadObj = pFdObj->pThreadObj;
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
headLen = taosReadMsg(pFdObj->pSocket, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen);
return -1;
......@@ -486,13 +482,12 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
} else {
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd,
buffer);
tTrace("%s %p read data, FD:%p TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, buffer);
}
msg = buffer + tsRpcOverhead;
leftLen = msgLen - headLen;
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
retLen = taosReadMsg(pFdObj->pSocket, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
......@@ -532,7 +527,7 @@ static void *taosProcessTcpData(void *param) {
setThreadName(name);
while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
int fdNum = taosWaitEpoll(pThreadObj->pEpoll, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
if (pThreadObj->stop) {
tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label);
break;
......@@ -561,7 +556,7 @@ static void *taosProcessTcpData(void *param) {
}
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
shutdown(pFdObj->fd, SHUT_WR);
taosShutDownSocketWR(pFdObj->pSocket);
continue;
}
......@@ -572,9 +567,9 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break;
}
if (pThreadObj->pollFd >= 0) {
EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1;
if (pThreadObj->pEpoll != NULL) {
taosCloseEpoll(&pThreadObj->pEpoll);
pThreadObj->pEpoll = NULL;
}
while (pThreadObj->pHead) {
......@@ -590,7 +585,7 @@ static void *taosProcessTcpData(void *param) {
return NULL;
}
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket) {
struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
......@@ -599,13 +594,13 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
}
pFdObj->closedByApp = 0;
pFdObj->fd = fd;
pFdObj->pSocket = pSocket;
pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
if (taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_ADD, pSocket, &event) < 0) {
tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
......@@ -635,8 +630,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
}
pFdObj->signature = NULL;
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
taosCloseSocket(pFdObj->fd);
taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_DEL, pFdObj->pSocket, NULL);
taosCloseSocket(&pFdObj->pSocket);
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)
......@@ -655,8 +650,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj,
pFdObj->fd, pThreadObj->numOfFds);
tDebug("%s %p TCP connection is closed, FD:%p numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
tfree(pFdObj);
}
......
......@@ -31,14 +31,14 @@
typedef struct {
int index;
SOCKET fd;
TdSocketPtr pSocket;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread;
void * hash;
void * shandle; // handle passed by upper layer during server initialization
void * pSet;
void *hash;
void *shandle; // handle passed by upper layer during server initialization
void *pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
} SUdpConn;
......@@ -86,8 +86,8 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i;
ownPort = (port ? port + i : 0);
pConn->fd = taosOpenUdpSocket(ip, ownPort);
if (pConn->fd < 0) {
pConn->pSocket = taosOpenUdpSocket(ip, ownPort);
if (pConn->pSocket == NULL) {
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
break;
}
......@@ -100,7 +100,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
......@@ -138,9 +138,9 @@ void taosStopUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
pConn->fd = -1;
if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
pConn->pSocket = NULL;
}
for (int i = 0; i < pSet->threads; ++i) {
......@@ -163,7 +163,7 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
}
tDebug("%s UDP is cleaned up", pSet->label);
......@@ -199,13 +199,12 @@ static void *taosRecvUdpData(void *param) {
setThreadName("recvUdpData");
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) {
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d fd:%d", pConn->label, strerror(errno), (int32_t)dataLen,
pConn->fd);
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen);
// for windows usage, remote shutdown also returns - 1 in windows client
if (pConn->fd == -1) {
if (pConn->pSocket == NULL) {
break;
} else {
continue;
......@@ -255,7 +254,7 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = (int)taosSendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
return ret;
}
......
......@@ -173,7 +173,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1;
SOCKET fd = 0;
TdSocketPtr pSocket = NULL;
uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) {
......@@ -182,8 +182,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
goto SEND_OVER;
}
fd = taosOpenTcpClientSocket(ip, port, 0);
if (fd < 0) {
pSocket = taosOpenTcpClientSocket(ip, port, 0);
if (pSocket == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create http socket to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
......@@ -200,21 +200,20 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
char header[1024] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
if (taosWriteSocket(fd, header, headLen) < 0) {
if (taosWriteMsg(pSocket, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) {
if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http content to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
// read something to avoid nginx error 499
if (taosReadSocket(fd, header, 10) < 0) {
if (taosWriteMsg(pSocket, header, 10) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
......@@ -223,8 +222,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
code = 0;
SEND_OVER:
if (fd != 0) {
taosCloseSocket(fd);
if (pSocket != NULL) {
taosCloseSocket(&pSocket);
}
return code;
......
......@@ -18,176 +18,206 @@
#include "os.h"
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h"
#include <WS2tcpip.h>
#include <winbase.h>
#include <Winsock2.h>
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <Winsock2.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winbase.h>
#include <winsock2.h>
#include <ws2def.h>
#include "winsock2.h"
#else
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>
#endif
// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketServerPtr, TdSocketServer;
// typedef struct TdSocketConnector {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketConnectorPtr, TdSocketConnector;
typedef int32_t SocketFd;
typedef SocketFd EpollFd;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
pthread_rwlock_t rwlock;
#endif
int refId;
SocketFd fd;
} *TdSocketServerPtr, TdSocketServer;
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
int32_t taosSendto(SocketFd fd, void *buf, int len, unsigned int flags, const struct sockaddr *to, int tolen) {
return sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen);
}
int32_t taosWriteSocket(SocketFd fd, void *buf, int len) { return send((SOCKET)fd, buf, len, 0); }
int32_t taosReadSocket(SocketFd fd, void *buf, int len) { return recv((SOCKET)fd, buf, len, 0)(); }
int32_t taosCloseSocketNoCheck(SocketFd fd) { return closesocket((SOCKET)fd); }
int32_t taosCloseSocket(SocketFd fd) { closesocket((SOCKET)fd) }
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
pthread_rwlock_t rwlock;
#endif
int refId;
SocketFd fd;
} *TdSocketPtr, TdSocket;
#else
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
pthread_rwlock_t rwlock;
#endif
int refId;
EpollFd fd;
} *TdEpollPtr, TdEpoll;
#define taosSend(sockfd, buf, len, flags) send(sockfd, buf, len, flags)
int32_t taosSendto(SocketFd fd, void * buf, int len, unsigned int flags, const struct sockaddr * dest_addr, int addrlen) {
return sendto(fd, buf, len, flags, dest_addr, addrlen);
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
int addrlen) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t taosWriteSocket(SocketFd fd, void *buf, int len) {
return write(fd, buf, len);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#else
return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
#endif
}
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t taosReadSocket(SocketFd fd, void *buf, int len) {
return read(fd, buf, len);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return send(pSocket->fd, buf, len, 0);
;
#else
return write(pSocket->fd, buf, len);
#endif
}
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return recv(pSocket->fd, buf, len, 0);
;
#else
return read(pSocket->fd, buf, len);
#endif
}
int32_t taosCloseSocketNoCheck(SocketFd fd) {
return close(fd);
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t taosCloseSocket(SocketFd fd) {
if (fd > -1) {
close(fd);
return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
}
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return closesocket(fd);
#else
return close(fd);
#endif
}
int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
int32_t code;
if (ppSocket == NULL || *ppSocket == NULL || (*ppSocket)->fd < 0) {
return -1;
}
code = taosCloseSocketNoCheck1((*ppSocket)->fd);
(*ppSocket)->fd = -1;
free(*ppSocket);
return code;
}
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
int32_t code;
if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
return -1;
}
#endif
code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
(*ppSocketServer)->fd = -1;
free(*ppSocketServer);
return code;
}
void taosShutDownSocketRD(SOCKET fd) {
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
#ifdef WINDOWS
closesocket(fd);
return closesocket(pSocket->fd);
#elif __APPLE__
close(fd);
return close(pSocket->fd);
#else
shutdown(fd, SHUT_RD);
return shutdown(pSocket->fd, SHUT_RD);
#endif
}
void taosShutDownSocketWR(SOCKET fd) {
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
if (pSocketServer == NULL || pSocketServer->fd < 0) {
return -1;
}
#ifdef WINDOWS
closesocket(fd);
return closesocket(pSocketServer->fd);
#elif __APPLE__
close(fd);
return close(pSocketServer->fd);
#else
shutdown(fd, SHUT_WR);
return shutdown(pSocketServer->fd, SHUT_RD);
#endif
}
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
//printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
}
if (on)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if ((flags = fcntl(sock, F_SETFL, flags)) < 0) {
//printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
return 0;
}
#ifdef WINDOWS
return closesocket(pSocket->fd);
#elif __APPLE__
return close(pSocket->fd);
#else
return shutdown(pSocket->fd, SHUT_WR);
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_32))
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
}
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
return getsockopt(socketfd, level, optname, optval, (socklen_t *)optlen);
}
#endif
#if !((defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) && defined(_MSC_VER))
uint32_t taosInetAddr(const char *ipAddr) { return inet_addr(ipAddr); }
const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); }
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
if (pSocketServer == NULL || pSocketServer->fd < 0) {
return -1;
}
#ifdef WINDOWS
return closesocket(pSocketServer->fd);
#elif __APPLE__
return close(pSocketServer->fd);
#else
return shutdown(pSocketServer->fd, SHUT_WR);
#endif
#if defined(_TD_DARWIN_64)
/*
* darwin implementation
*/
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
if (level == SOL_SOCKET && optname == SO_SNDBUF) {
return 0;
}
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
if (level == SOL_SOCKET && optname == SO_RCVBUF) {
return 0;
#ifdef WINDOWS
return closesocket(pSocket->fd);
#elif __APPLE__
return close(pSocket->fd);
#else
return shutdown(pSocket->fd, SHUT_RDWR);
#endif
}
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
if (pSocketServer == NULL || pSocketServer->fd < 0) {
return -1;
}
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
#ifdef WINDOWS
return closesocket(pSocketServer->fd);
#elif __APPLE__
return close(pSocketServer->fd);
#else
return shutdown(pSocketServer->fd, SHUT_RDWR);
#endif
}
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
#if defined(_TD_GO_DLL_)
uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); }
#endif
#endif
void taosWinSocketInit1() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
/*
* windows implementation
*/
#include <IPHlpApi.h>
#include <WS2tcpip.h>
#include <stdio.h>
#include <string.h>
#include <tchar.h>
#include <winsock2.h>
#include <ws2def.h>
void taosWinSocketInit() {
static char flag = 0;
if (flag == 0) {
WORD wVersionRequested;
......@@ -197,21 +227,46 @@ void taosWinSocketInit() {
flag = 1;
}
}
#else
#endif
}
int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
u_long mode;
if (on) {
mode = 1;
ioctlsocket(sock, FIONBIO, &mode);
ioctlsocket(pSocket->fd, FIONBIO, &mode);
} else {
mode = 0;
ioctlsocket(sock, FIONBIO, &mode);
ioctlsocket(pSocket->fd, FIONBIO, &mode);
}
#else
int32_t flags = 0;
if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
// printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
}
if (on)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
// printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
}
#endif
return 0;
}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0;
}
......@@ -228,13 +283,23 @@ int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *op
return 0;
}
return setsockopt(socketfd, level, optname, optval, optlen);
return setsockopt(pSocket->fd, level, optname, optval, optlen);
#else
return setsockopt(pSocket->fd, level, optname, optval, (socklen_t)optlen);
#endif
}
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return getsockopt(pSocket->fd, level, optname, optval, (socklen_t *)optlen);
#endif
}
#ifdef _MSC_VER
//#if _MSC_VER >= 1900
uint32_t taosInetAddr(const char *ipAddr) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
uint32_t value;
int32_t ret = inet_pton(AF_INET, ipAddr, &value);
if (ret <= 0) {
......@@ -242,39 +307,37 @@ uint32_t taosInetAddr(const char *ipAddr) {
} else {
return value;
}
#else
return inet_addr(ipAddr);
#endif
}
const char *taosInetNtoa(struct in_addr ipInt) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
// not thread safe, only for debug usage while print log
static char tmpDstStr[16];
return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN);
}
//#endif
#endif
#if defined(_TD_GO_DLL_)
uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); }
#endif
#else
return inet_ntoa(ipInt);
#endif
}
#ifndef SIGPIPE
#define SIGPIPE EPIPE
#define SIGPIPE EPIPE
#endif
#define TCP_CONN_TIMEOUT 3000 // conn timeout
int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t nleft, nwritten;
char * ptr = (char *)buf;
char *ptr = (char *)buf;
nleft = nbytes;
while (nleft > 0) {
nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) {
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
continue;
......@@ -293,20 +356,21 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
return (nbytes - nleft);
}
int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t nleft, nread;
char * ptr = (char *)buf;
char *ptr = (char *)buf;
nleft = nbytes;
if (fd < 0) return -1;
while (nleft > 0) {
nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
continue;
} else {
return -1;
......@@ -324,8 +388,11 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
return (nbytes - nleft);
}
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1);
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
taosSetNonblocking(pSocket, 1);
int32_t nleft, nwritten, nready;
fd_set fset;
......@@ -336,24 +403,24 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
FD_SET(pSocket->fd, &fset);
if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
//printf("fd %d timeout, no enough space to write", fd);
// printf("fd %d timeout, no enough space to write", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
//printf("select error, %d (%s)", errno, strerror(errno));
// printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue;
//printf("write error, %d (%s)", errno, strerror(errno));
// printf("write error, %d (%s)", errno, strerror(errno));
return -1;
}
......@@ -361,121 +428,99 @@ int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
ptr += nwritten;
}
taosSetNonblocking(fd, 0);
return (nbytes - nleft);
}
int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
int32_t nread, nready, nleft = nbytes;
fd_set fset;
struct timeval tv;
while (nleft > 0) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
//printf("fd %d timeout\n", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
//printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue;
//printf("read error, %d (%s)", errno, strerror(errno));
return -1;
} else if (nread == 0) {
//printf("fd %d EOF", fd);
break; // EOF
}
nleft -= nread;
ptr += nread;
}
taosSetNonblocking(pSocket, 0);
return (nbytes - nleft);
}
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr;
SOCKET sockFd;
SocketFd fd;
int32_t bufSize = 1024000;
//printf("open udp socket:0x%x:%hu", ip, port);
// printf("open udp socket:0x%x:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
//printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
// printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck1(fd);
return NULL;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
//printf("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pSocket->fd = fd;
pSocket->refId = 0;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
//printf("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
// printf("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
// printf("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
//printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd);
return -1;
if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
// printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(&pSocket);
return NULL;
}
return sockFd;
return pSocket;
}
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SOCKET sockFd = 0;
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SocketFd fd = -1;
int32_t ret;
struct sockaddr_in serverAddr, clientAddr;
int32_t bufSize = 1024 * 1024;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) {
//printf("failed to open the socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
if (fd <= 2) {
// printf("failed to open the socket: %d (%s)", errno, strerror(errno));
if (fd >= 0) taosCloseSocketNoCheck1(fd);
return NULL;
}
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pSocket->fd = fd;
pSocket->refId = 0;
/* set REUSEADDR option, so the portnumber can be re-used */
int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
//printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
//printf("failed to set the send buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
// printf("failed to set the send buffer size for TCP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
//printf("failed to set the receive buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
// printf("failed to set the receive buffer size for TCP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
if (clientIp != 0) {
......@@ -485,11 +530,11 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
clientAddr.sin_port = 0;
/* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
//printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
// printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
// strerror(errno));
taosCloseSocket(sockFd);
return -1;
taosCloseSocket(&pSocket);
return NULL;
}
}
......@@ -499,158 +544,192 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
#ifdef _TD_LINUX
taosSetNonblocking(sockFd, 1);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
taosSetNonblocking(pSocket, 1);
ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret == -1) {
if (errno == EHOSTUNREACH) {
//printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
// printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd wfd[1];
wfd[0].fd = sockFd;
wfd[0].fd = pSocket->fd;
wfd[0].events = POLLOUT;
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
if (res == -1 || res == 0) {
//printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(sockFd); //
// printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(&pSocket); //
return -1;
}
int optVal = -1, optLen = sizeof(int);
if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
//printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(sockFd); //
if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
// printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(&pSocket); //
return -1;
}
ret = 0;
} else { // Other error
//printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(sockFd); //
// printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(&pSocket); //
return -1;
}
}
taosSetNonblocking(sockFd, 0);
taosSetNonblocking(pSocket, 0);
#else
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
#endif
if (ret != 0) {
//printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
// printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
} else {
taosKeepTcpAlive(sockFd);
taosKeepTcpAlive(pSocket);
}
return sockFd;
return pSocket;
}
int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
//printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
// printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
#ifndef __APPLE__
// all fails on macosx
int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
//printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
// printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
//printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
// printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
//printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
// printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
#endif // __APPLE__
int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
//printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
// printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
struct linger linger = {0};
linger.l_onoff = 1;
linger.l_linger = 3;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
//printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
// printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
return 0;
}
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SOCKET sockFd;
SocketFd fd;
int32_t reuse;
//printf("open tcp server socket:0x%x:%hu", ip, port);
// printf("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
//printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck1(fd);
return NULL;
}
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pSocket->refId = 0;
pSocket->fd = fd;
/* set REUSEADDR option, so the portnumber can be re-used */
reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
//printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
//printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (taosKeepTcpAlive(sockFd) < 0) {
//printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
if (taosKeepTcpAlive(pSocket) < 0) {
// printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (listen(sockFd, 1024) < 0) {
//printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
if (listen(pSocket->fd, 1024) < 0) {
// printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
return sockFd;
return (TdSocketServerPtr)pSocket;
}
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr,
socklen_t *addrLen) {
if (pServerSocket == NULL || pServerSocket->fd < 0) {
return NULL;
}
SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
if (fd == -1) {
// tError("TCP accept failure(%s)", strerror(errno));
return NULL;
}
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pSocket->fd = fd;
pSocket->refId = 0;
return pSocket;
}
#define COPY_SIZE 32768
// sendfile shall be used
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
return -1;
}
int64_t leftLen;
int64_t readLen, writeLen;
char temp[COPY_SIZE];
......@@ -663,17 +742,17 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
else
readLen = COPY_SIZE; // 4K
int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
if (readLen != retLen) {
//printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// readLen, retLen, len, leftLen, strerror(errno));
return -1;
}
writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen);
writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
if (readLen != writeLen) {
//printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
// readLen, writeLen, len, leftLen, strerror(errno));
return -1;
}
......@@ -692,7 +771,7 @@ void taosBlockSIGPIPE() {
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to block SIGPIPE");
// printf("failed to block SIGPIPE");
}
#endif
}
......@@ -706,7 +785,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
......@@ -715,12 +794,12 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
// printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
} else {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
// printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
// printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
......@@ -730,7 +809,7 @@ int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
//printf("failed to get hostname, reason:%s", strerror(errno));
// printf("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
......@@ -747,7 +826,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
//printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
// printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
......@@ -793,7 +872,6 @@ void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
void taosIgnSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
......@@ -809,7 +887,67 @@ void taosSetMaskSIGPIPE() {
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to setmask SIGPIPE");
// printf("failed to setmask SIGPIPE");
}
#endif
}
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, socklen_t *addrLen) {
if (pSocket == NULL || pSocket->fd < 0) {
return -1;
}
return getsockname(pSocket->fd, destAddr, addrLen);
}
TdEpollPtr taosCreateEpoll(int32_t size) {
EpollFd fd = -1;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
fd = epoll_create(size);
#endif
if (fd < 0) {
return NULL;
}
TdEpollPtr pEpoll = (TdEpollPtr)malloc(sizeof(TdEpoll));
if (pEpoll == NULL) {
taosCloseSocketNoCheck1(fd);
return NULL;
}
pEpoll->fd = fd;
pEpoll->refId = 0;
return pEpoll;
}
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event) {
int32_t code = -1;
if (pEpoll == NULL || pEpoll->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
code = epoll_ctl(pEpoll->fd, epollOperate, pSocket->fd, event);
#endif
return code;
}
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout) {
int32_t code = -1;
if (pEpoll == NULL || pEpoll->fd < 0) {
return -1;
}
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
code = epoll_wait(pEpoll->fd, event, maxEvents, timeout);
#endif
return code;
}
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
int32_t code;
if (ppEpoll == NULL || *ppEpoll == NULL || (*ppEpoll)->fd < 0) {
return -1;
}
code = taosCloseSocketNoCheck1((*ppEpoll)->fd);
(*ppEpoll)->fd = -1;
free(*ppEpoll);
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册