未验证 提交 9369a7a5 编写于 作者: wafwerar's avatar wafwerar 提交者: GitHub

Merge pull request #10635 from taosdata/fix/ZhiqiangWang/TD-13760-redefine-socket-api

[TD-13760]<fix>: redefine socket api.
......@@ -21,7 +21,10 @@
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
#define accept ACCEPT_FUNC_TAOS_FORBID
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
......@@ -38,31 +41,6 @@
extern "C" {
#endif
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
typedef SOCKET SocketFd;
#else
typedef int32_t SocketFd;
#endif
int32_t taosSendto(SocketFd fd, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen);
int32_t taosWriteSocket(SocketFd fd, void *msg, int len);
int32_t taosReadSocket(SocketFd fd, void *msg, int len);
int32_t taosCloseSocketNoCheck(SocketFd fd);
int32_t taosCloseSocket(SocketFd fd);
void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
#define htobe64 htonll
#if defined(_TD_GO_DLL_)
......@@ -74,25 +52,54 @@ const char *taosInetNtoa(struct in_addr ipInt);
#define htobe64 htonll
#endif
int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
#define TAOS_EPOLL_WAIT_TIME 500
typedef struct TdSocketServer *TdSocketServerPtr;
typedef struct TdSocket *TdSocketPtr;
typedef struct TdEpoll *TdEpollPtr;
int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen);
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen);
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
int32_t taosShutDownSocketRD(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer);
int32_t taosShutDownSocketWR(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer);
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer);
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on);
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen);
int32_t taosWriteMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
int32_t taosReadMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(TdSocketPtr pSocket);
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, socklen_t *addrLen);
void taosBlockSIGPIPE();
int32_t taosGetSocketName(TdSocketPtr pSocket,struct sockaddr *destAddr, socklen_t *addrLen);
void taosBlockSIGPIPE();
uint32_t taosGetIpv4FromFqdn(const char *);
int32_t taosGetFqdn(char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
void taosIgnSIGPIPE();
void taosSetMaskSIGPIPE();
void taosIgnSIGPIPE();
void taosSetMaskSIGPIPE();
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
TdEpollPtr taosCreateEpoll(int32_t size);
int32_t taosCtlEpoll(TdEpollPtr pEpoll, int32_t epollOperate, TdSocketPtr pSocket, struct epoll_event *event);
int32_t taosWaitEpoll(TdEpollPtr pEpoll, struct epoll_event *event, int32_t maxEvents, int32_t timeout);
int32_t taosCloseEpoll(TdEpollPtr *ppEpoll);
#ifdef __cplusplus
}
......
......@@ -24,7 +24,7 @@
#ifndef USE_UV
typedef struct SFdObj {
void * signature;
SOCKET fd; // TCP socket FD
TdSocketPtr pSocket; // TCP socket FD
void * thandle; // handle from upper layer, like TAOS
uint32_t ip;
uint16_t port;
......@@ -40,7 +40,7 @@ typedef struct SThreadObj {
pthread_mutex_t mutex;
uint32_t ip;
bool stop;
EpollFd pollFd;
TdEpollPtr pEpoll;
int numOfFds;
int threadId;
char label[TSDB_LABEL_LEN];
......@@ -56,20 +56,20 @@ typedef struct {
} SClientObj;
typedef struct {
SOCKET fd;
uint32_t ip;
uint16_t port;
int8_t stop;
int8_t reserve;
char label[TSDB_LABEL_LEN];
int numOfThreads;
void * shandle;
SThreadObj **pThreadObj;
pthread_t thread;
TdSocketServerPtr pSocketServer;
uint32_t ip;
uint16_t port;
int8_t stop;
int8_t reserve;
char label[TSDB_LABEL_LEN];
int numOfThreads;
void * shandle;
SThreadObj **pThreadObj;
pthread_t thread;
} SServerObj;
static void * taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void * taosAcceptTcpConnection(void *arg);
......@@ -85,7 +85,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL;
}
pServerObj->fd = -1;
pServerObj->pSocketServer = NULL;
taosResetPthread(&pServerObj->thread);
pServerObj->ip = ip;
pServerObj->port = port;
......@@ -118,7 +118,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1;
pThreadObj->pEpoll = NULL;
taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
......@@ -135,8 +135,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter
if (pThreadObj->pEpoll == NULL) {
tError("%s failed to create TCP epoll", label);
code = -1;
break;
......@@ -151,8 +151,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj->threadId = i;
}
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) code = -1;
pServerObj->pSocketServer = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->pSocketServer == NULL) code = -1;
if (code == 0) {
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
......@@ -196,8 +196,8 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return;
pServerObj->stop = 1;
if (pServerObj->fd >= 0) {
taosShutDownSocketRD(pServerObj->fd);
if (pServerObj->pSocketServer != NULL) {
taosShutDownSocketServerRD(pServerObj->pSocketServer);
}
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
......@@ -227,7 +227,7 @@ void taosCleanUpTcpServer(void *handle) {
}
static void *taosAcceptTcpConnection(void *arg) {
SOCKET connFd = -1;
TdSocketPtr pSocket = NULL;
struct sockaddr_in caddr;
int threadId = 0;
SThreadObj * pThreadObj;
......@@ -239,13 +239,13 @@ static void *taosAcceptTcpConnection(void *arg) {
while (1) {
socklen_t addrlen = sizeof(caddr);
connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
pSocket = taosAcceptTcpConnectSocket(pServerObj->pSocketServer, (struct sockaddr *)&caddr, &addrlen);
if (pServerObj->stop) {
tDebug("%s TCP server stop accepting new connections", pServerObj->label);
break;
}
if (connFd == -1) {
if (pSocket == NULL) {
if (errno == EINVAL) {
tDebug("%s TCP server stop accepting new connections, exiting", pServerObj->label);
break;
......@@ -255,11 +255,11 @@ static void *taosAcceptTcpConnection(void *arg) {
continue;
}
taosKeepTcpAlive(connFd);
taosKeepTcpAlive(pSocket);
struct timeval to = {5, 0};
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
int32_t ret = taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) {
taosCloseSocket(connFd);
taosCloseSocket(&pSocket);
tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
continue;
......@@ -268,14 +268,14 @@ static void *taosAcceptTcpConnection(void *arg) {
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket);
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
tDebug("%s new TCP connection from %s:%hu, FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
} else {
taosCloseSocket(connFd);
taosCloseSocket(&pSocket);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
}
......@@ -285,7 +285,7 @@ static void *taosAcceptTcpConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads;
}
taosCloseSocket(pServerObj->fd);
taosCloseSocketServer(&pServerObj->pSocketServer);
return NULL;
}
......@@ -339,8 +339,8 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
pThreadObj->pEpoll = taosCreateEpoll(10); // size does not matter
if (pThreadObj->pEpoll == NULL) {
tError("%s failed to create TCP epoll", label);
code = -1;
break;
......@@ -388,21 +388,17 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
atomic_store_32(&pClientObj->index, index + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
if (fd == (SOCKET)-1) return NULL;
#else
if (fd <= 0) return NULL;
#endif
TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (pSocket == NULL) return NULL;
struct sockaddr_in sin;
uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
if (taosGetSocketName(pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port);
}
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, pSocket);
if (pFdObj) {
pFdObj->thandle = thandle;
......@@ -415,7 +411,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
ipport, localPort, pFdObj, pThreadObj->numOfFds);
} else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd);
taosCloseSocket(&pSocket);
}
return pFdObj;
......@@ -430,7 +426,7 @@ void taosCloseTcpConnection(void *chandle) {
// pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
taosShutDownSocketWR(pFdObj->fd);
taosShutDownSocketWR(pFdObj->pSocket);
}
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
......@@ -438,8 +434,8 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
int ret = taosWriteMsg(pFdObj->fd, data, len);
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
int ret = taosWriteMsg(pFdObj->pSocket, data, len);
tTrace("%s %p TCP data is sent, FD:%p bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, ret);
return ret;
}
......@@ -449,7 +445,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context
if (pFdObj->closedByApp == 0) {
taosShutDownSocketWR(pFdObj->fd);
taosShutDownSocketWR(pFdObj->pSocket);
SRecvInfo recvInfo;
recvInfo.msg = NULL;
......@@ -473,7 +469,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SThreadObj *pThreadObj = pFdObj->pThreadObj;
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
headLen = taosReadMsg(pFdObj->pSocket, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen);
return -1;
......@@ -486,13 +482,12 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
} else {
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd,
buffer);
tTrace("%s %p read data, FD:%p TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, buffer);
}
msg = buffer + tsRpcOverhead;
leftLen = msgLen - headLen;
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
retLen = taosReadMsg(pFdObj->pSocket, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
......@@ -532,7 +527,7 @@ static void *taosProcessTcpData(void *param) {
setThreadName(name);
while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
int fdNum = taosWaitEpoll(pThreadObj->pEpoll, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
if (pThreadObj->stop) {
tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label);
break;
......@@ -561,7 +556,7 @@ static void *taosProcessTcpData(void *param) {
}
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
shutdown(pFdObj->fd, SHUT_WR);
taosShutDownSocketWR(pFdObj->pSocket);
continue;
}
......@@ -572,9 +567,9 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break;
}
if (pThreadObj->pollFd >= 0) {
EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1;
if (pThreadObj->pEpoll != NULL) {
taosCloseEpoll(&pThreadObj->pEpoll);
pThreadObj->pEpoll = NULL;
}
while (pThreadObj->pHead) {
......@@ -590,7 +585,7 @@ static void *taosProcessTcpData(void *param) {
return NULL;
}
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, TdSocketPtr pSocket) {
struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
......@@ -599,13 +594,13 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
}
pFdObj->closedByApp = 0;
pFdObj->fd = fd;
pFdObj->pSocket = pSocket;
pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
if (taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_ADD, pSocket, &event) < 0) {
tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
......@@ -635,8 +630,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
}
pFdObj->signature = NULL;
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
taosCloseSocket(pFdObj->fd);
taosCtlEpoll(pThreadObj->pEpoll, EPOLL_CTL_DEL, pFdObj->pSocket, NULL);
taosCloseSocket(&pFdObj->pSocket);
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)
......@@ -655,8 +650,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj,
pFdObj->fd, pThreadObj->numOfFds);
tDebug("%s %p TCP connection is closed, FD:%p numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
tfree(pFdObj);
}
......
......@@ -30,17 +30,17 @@
#define RPC_MAX_UDP_SIZE 65480
typedef struct {
int index;
SOCKET fd;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread;
void * hash;
void * shandle; // handle passed by upper layer during server initialization
void * pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
int index;
TdSocketPtr pSocket;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread;
void *hash;
void *shandle; // handle passed by upper layer during server initialization
void *pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
} SUdpConn;
typedef struct {
......@@ -86,8 +86,8 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i;
ownPort = (port ? port + i : 0);
pConn->fd = taosOpenUdpSocket(ip, ownPort);
if (pConn->fd < 0) {
pConn->pSocket = taosOpenUdpSocket(ip, ownPort);
if (pConn->pSocket == NULL) {
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
break;
}
......@@ -100,7 +100,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
......@@ -138,9 +138,9 @@ void taosStopUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
pConn->fd = -1;
if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
pConn->pSocket = NULL;
}
for (int i = 0; i < pSet->threads; ++i) {
......@@ -163,7 +163,7 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
}
tDebug("%s UDP is cleaned up", pSet->label);
......@@ -199,13 +199,12 @@ static void *taosRecvUdpData(void *param) {
setThreadName("recvUdpData");
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) {
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d fd:%d", pConn->label, strerror(errno), (int32_t)dataLen,
pConn->fd);
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen);
// for windows usage, remote shutdown also returns - 1 in windows client
if (pConn->fd == -1) {
if (pConn->pSocket == NULL) {
break;
} else {
continue;
......@@ -255,7 +254,7 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = (int)taosSendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
return ret;
}
......
......@@ -173,7 +173,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1;
SOCKET fd = 0;
TdSocketPtr pSocket = NULL;
uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) {
......@@ -182,8 +182,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
goto SEND_OVER;
}
fd = taosOpenTcpClientSocket(ip, port, 0);
if (fd < 0) {
pSocket = taosOpenTcpClientSocket(ip, port, 0);
if (pSocket == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create http socket to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
......@@ -200,21 +200,20 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
char header[1024] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
if (taosWriteSocket(fd, header, headLen) < 0) {
if (taosWriteMsg(pSocket, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) {
if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http content to %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
// read something to avoid nginx error 499
if (taosReadSocket(fd, header, 10) < 0) {
if (taosWriteMsg(pSocket, header, 10) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
......@@ -223,8 +222,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
code = 0;
SEND_OVER:
if (fd != 0) {
taosCloseSocket(fd);
if (pSocket != NULL) {
taosCloseSocket(&pSocket);
}
return code;
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册