diff --git a/src/rpc/inc/rpcServer.h b/src/rpc/inc/rpcServer.h deleted file mode 100644 index 6b238733a4773a0fd3676380bd683bc5f968d075..0000000000000000000000000000000000000000 --- a/src/rpc/inc/rpcServer.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _rpc_server_header_ -#define _rpc_server_header_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include "taosdef.h" - -void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); -void taosCleanUpTcpServer(void *param); -void taosCloseTcpServerConnection(void *param); -int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/rpc/inc/rpcClient.h b/src/rpc/inc/rpcTcp.h similarity index 74% rename from src/rpc/inc/rpcClient.h rename to src/rpc/inc/rpcTcp.h index c87ae7931276c30a902a484406cc80c9c798ff6a..16972dbc7e6008ee7c21bcbbc48c4d4c18f90dc2 100644 --- a/src/rpc/inc/rpcClient.h +++ b/src/rpc/inc/rpcTcp.h @@ -13,20 +13,22 @@ * along with this program. If not, see . */ -#ifndef _rpc_client_header_ -#define _rpc_client_header_ +#ifndef _rpc_tcp_header_ +#define _rpc_tcp_header_ #ifdef __cplusplus extern "C" { #endif -#include "taosdef.h" +void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); +void taosCleanUpTcpServer(void *param); void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); void taosCleanUpTcpClient(void *chandle); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); -void taosCloseTcpClientConnection(void *chandle); -int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); + +void taosCloseTcpConnection(void *chandle); +int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c deleted file mode 100644 index 7ca24f6229753fcb703b8b44b70a1f31985eca6b..0000000000000000000000000000000000000000 --- a/src/rpc/src/rpcClient.c +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "taosmsg.h" -#include "tlog.h" -#include "tsocket.h" -#include "tutil.h" -#include "rpcClient.h" -#include "rpcHead.h" - -#ifndef EPOLLWAKEUP -#define EPOLLWAKEUP (1u << 29) -#endif - -typedef struct _tcp_fd { - void *signature; - int fd; // TCP socket FD - void * thandle; - uint32_t ip; - char ipstr[20]; - uint16_t port; - struct _tcp_client *pTcp; - struct _tcp_fd * prev, *next; -} STcpFd; - -typedef struct _tcp_client { - pthread_t thread; - STcpFd * pHead; - pthread_mutex_t mutex; - pthread_cond_t fdReady; - int pollFd; - int numOfFds; - char label[12]; - char ipstr[20]; - void *shandle; // handle passed by upper layer during server initialization - void *(*processData)(SRecvInfo *pRecv); -} STcpClient; - -#define maxTcpEvents 100 - -static void taosCleanUpTcpFdObj(STcpFd *pFdObj); -static void *taosReadTcpData(void *param); - -void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { - STcpClient *pTcp; - pthread_attr_t thattr; - - pTcp = (STcpClient *)malloc(sizeof(STcpClient)); - memset(pTcp, 0, sizeof(STcpClient)); - strcpy(pTcp->label, label); - strcpy(pTcp->ipstr, ip); - pTcp->shandle = shandle; - - if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) { - tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); - return NULL; - } - - if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); - return NULL; - } - - pTcp->pollFd = epoll_create(10); // size does not matter - if (pTcp->pollFd < 0) { - tError("%s failed to create TCP client epoll", label); - return NULL; - } - - pTcp->processData = fp; - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)); - pthread_attr_destroy(&thattr); - if (code != 0) { - tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); - return NULL; - } - - tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port); - - return pTcp; -} - -void taosCleanUpTcpClient(void *chandle) { - STcpClient *pTcp = (STcpClient *)chandle; - if (pTcp == NULL) return; - - while (pTcp->pHead) { - taosCleanUpTcpFdObj(pTcp->pHead); - pTcp->pHead = pTcp->pHead->next; - } - - close(pTcp->pollFd); - - pthread_cancel(pTcp->thread); - pthread_join(pTcp->thread, NULL); - - // tTrace (":%s, all connections are cleaned up", pTcp->label); - - tfree(pTcp); -} - -void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { - STcpClient * pTcp = (STcpClient *)shandle; - STcpFd * pFdObj; - struct epoll_event event; - struct in_addr destIp; - int fd; - - fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr); - if (fd <= 0) return NULL; - - pFdObj = (STcpFd *)malloc(sizeof(STcpFd)); - if (pFdObj == NULL) { - tError("%s no enough resource to allocate TCP FD IDs", pTcp->label); - tclose(fd); - return NULL; - } - - memset(pFdObj, 0, sizeof(STcpFd)); - pFdObj->fd = fd; - strcpy(pFdObj->ipstr, ip); - inet_aton(ip, &destIp); - pFdObj->ip = destIp.s_addr; - pFdObj->port = port; - pFdObj->pTcp = pTcp; - pFdObj->thandle = thandle; - pFdObj->signature = pFdObj; - - event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; - event.data.ptr = pFdObj; - if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { - tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno)); - tfree(pFdObj); - tclose(fd); - return NULL; - } - - // notify the data process, add into the FdObj list - pthread_mutex_lock(&(pTcp->mutex)); - pFdObj->next = pTcp->pHead; - if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj; - pTcp->pHead = pFdObj; - pTcp->numOfFds++; - pthread_cond_signal(&pTcp->fdReady); - pthread_mutex_unlock(&(pTcp->mutex)); - - tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds); - - return pFdObj; -} - -void taosCloseTcpClientConnection(void *chandle) { - STcpFd *pFdObj = (STcpFd *)chandle; - - if (pFdObj == NULL) return; - - taosCleanUpTcpFdObj(pFdObj); -} - -int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { - STcpFd *pFdObj = (STcpFd *)chandle; - - if (chandle == NULL) return -1; - - return (int)send(pFdObj->fd, data, (size_t)len, 0); -} - -static void taosReportBrokenLink(STcpFd *pFdObj) { - SRecvInfo recvInfo; - STcpClient *pTcp = pFdObj->pTcp; - - if (pFdObj->thandle) { - recvInfo.msg = NULL; - recvInfo.msgLen = 0; - recvInfo.ip = 0; - recvInfo.port = 0; - recvInfo.shandle = pTcp->shandle; - recvInfo.thandle = pFdObj->thandle;; - recvInfo.chandle = NULL; - recvInfo.connType = RPC_CONN_TCP; - (*(pTcp->processData))(&recvInfo); - } -} - -static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { - - if (pFdObj == NULL) return; - if (pFdObj->signature != pFdObj) return; - - pFdObj->signature = NULL; - STcpClient *pTcp = pFdObj->pTcp; - - epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - close(pFdObj->fd); - - pthread_mutex_lock(&pTcp->mutex); - - pTcp->numOfFds--; - - if (pTcp->numOfFds < 0) - tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj); - - if (pFdObj->prev) { - (pFdObj->prev)->next = pFdObj->next; - } else { - pTcp->pHead = pFdObj->next; - } - - if (pFdObj->next) { - (pFdObj->next)->prev = pFdObj->prev; - } - - pthread_mutex_unlock(&pTcp->mutex); - - tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds); - - tfree(pFdObj); -} - -static void *taosReadTcpData(void *param) { - STcpClient *pTcp = (STcpClient *)param; - int i, fdNum; - STcpFd *pFdObj; - struct epoll_event events[maxTcpEvents]; - SRecvInfo recvInfo; - SRpcHead rpcHead; - - while (1) { - pthread_mutex_lock(&pTcp->mutex); - if (pTcp->numOfFds < 1) pthread_cond_wait(&pTcp->fdReady, &pTcp->mutex); - pthread_mutex_unlock(&pTcp->mutex); - - fdNum = epoll_wait(pTcp->pollFd, events, maxTcpEvents, -1); - if (fdNum < 0) continue; - - for (i = 0; i < fdNum; ++i) { - pFdObj = events[i].data.ptr; - - if (events[i].events & EPOLLERR) { - tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle); - taosReportBrokenLink(pFdObj); - continue; - } - - if (events[i].events & EPOLLHUP) { - tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle); - taosReportBrokenLink(pFdObj); - continue; - } - - int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); - if (headLen != sizeof(SRpcHead)) { - tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen); - taosReportBrokenLink(pFdObj); - continue; - } - - int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead); - if (NULL == buffer) { - tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen); - taosReportBrokenLink(pFdObj); - continue; - } - - char *msg = buffer + tsRpcOverhead; - int32_t leftLen = msgLen - headLen; - int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); - - if (leftLen != retLen) { - tError("%s %p, read error, leftLen:%d retLen:%d", - pTcp->label, pFdObj->thandle, leftLen, retLen); - tfree(buffer); - taosReportBrokenLink(pFdObj); - continue; - } - - // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen); - - memcpy(msg, &rpcHead, sizeof(SRpcHead)); - recvInfo.msg = msg; - recvInfo.msgLen = msgLen; - recvInfo.ip = pFdObj->ip; - recvInfo.port = pFdObj->port; - recvInfo.shandle = pTcp->shandle; - recvInfo.thandle = pFdObj->thandle;; - recvInfo.chandle = pFdObj; - recvInfo.connType = RPC_CONN_TCP; - - pFdObj->thandle = (*(pTcp->processData))(&recvInfo); - - if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj); - } - } - - return NULL; -} - - diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c old mode 100755 new mode 100644 index 8280264764a4463407ebf74e193f464cc4dd07d8..5048d5db14572a8bc6c4f7b7ff6bd70818e2333a --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -27,8 +27,7 @@ #include "taosmsg.h" #include "rpcUdp.h" #include "rpcCache.h" -#include "rpcClient.h" -#include "rpcServer.h" +#include "rpcTcp.h" #include "rpcHead.h" #include "trpc.h" #include "hash.h" @@ -67,7 +66,7 @@ typedef struct { void *udphandle;// returned handle from UDP initialization void *pCache; // connection cache pthread_mutex_t mutex; - struct _RpcConn *connList; // connection list + struct SRpcConn *connList; // connection list } SRpcInfo; typedef struct { @@ -88,7 +87,7 @@ typedef struct { char msg[0]; // RpcHead starts from here } SRpcReqContext; -typedef struct _RpcConn { +typedef struct SRpcConn { int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { taosSendUdpData, taosSendUdpData, - taosSendTcpServerData, - taosSendTcpClientData + taosSendTcpData, + taosSendTcpData }; void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { @@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = void (*taosCloseConn[])(void *chandle) = { NULL, NULL, - taosCloseTcpServerConnection, - taosCloseTcpClientConnection + taosCloseTcpConnection, + taosCloseTcpConnection }; static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcTcp.c similarity index 55% rename from src/rpc/src/rpcServer.c rename to src/rpc/src/rpcTcp.c index 37576fa0f68c3af27aeac3f160f6db7bdb7b325c..27b81deda552d977f5a87fc9870b21fb2f71d8de 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcTcp.c @@ -18,30 +18,30 @@ #include "tlog.h" #include "tsocket.h" #include "tutil.h" -#include "rpcServer.h" #include "rpcHead.h" +#include "rpcTcp.h" -#define TAOS_IPv4ADDR_LEN 16 #ifndef EPOLLWAKEUP #define EPOLLWAKEUP (1u << 29) #endif -typedef struct _fd_obj { - void *signature; - int fd; // TCP socket FD - void * thandle; // handle from upper layer, like TAOS - char ipstr[TAOS_IPv4ADDR_LEN]; - unsigned int ip; - uint16_t port; - struct _thread_obj *pThreadObj; - struct _fd_obj * prev, *next; +typedef struct SFdObj { + void *signature; + int fd; // TCP socket FD + void *thandle; // handle from upper layer, like TAOS + uint32_t ip; + uint16_t port; + struct SThreadObj *pThreadObj; + struct SFdObj *prev; + struct SFdObj *next; } SFdObj; -typedef struct _thread_obj { +typedef struct SThreadObj { pthread_t thread; SFdObj * pHead; - pthread_mutex_t threadMutex; + pthread_mutex_t mutex; pthread_cond_t fdReady; + char ipstr[TSDB_IPv4ADDR_LEN]; int pollFd; int numOfFds; int threadId; @@ -51,7 +51,7 @@ typedef struct _thread_obj { } SThreadObj; typedef struct { - char ip[40]; + char ip[TSDB_IPv4ADDR_LEN]; uint16_t port; char label[12]; int numOfThreads; @@ -60,13 +60,15 @@ typedef struct { pthread_t thread; } SServerObj; -static void taosCleanUpFdObj(SFdObj *pFdObj); -static void taosProcessTcpData(void *param); -static void taosAcceptTcpConnection(void *arg); +static void *taosProcessTcpData(void *param); +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd); +static void taosFreeFdObj(SFdObj *pFdObj); +static void taosReportBrokenLink(SFdObj *pFdObj); +static void taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - SServerObj *pServerObj; - SThreadObj *pThreadObj; + SServerObj *pServerObj; + SThreadObj *pThreadObj; pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); strcpy(pServerObj->ip, ip); @@ -88,7 +90,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, strcpy(pThreadObj->label, label); pThreadObj->shandle = shandle; - code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL); + code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); break;; @@ -110,7 +112,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)); + code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); pthread_attr_destroy(&thattr); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); @@ -144,8 +146,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, } void taosCleanUpTcpServer(void *handle) { - SThreadObj *pThreadObj; SServerObj *pServerObj = handle; + SThreadObj *pThreadObj; if (pServerObj == NULL) return; @@ -156,7 +158,7 @@ void taosCleanUpTcpServer(void *handle) { pThreadObj = pServerObj->pThreadObj + i; while (pThreadObj->pHead) { - taosCleanUpFdObj(pThreadObj->pHead); + taosFreeFdObj(pThreadObj->pHead); pThreadObj->pHead = pThreadObj->pHead; } @@ -164,7 +166,7 @@ void taosCleanUpTcpServer(void *handle) { pthread_cancel(pThreadObj->thread); pthread_join(pThreadObj->thread, NULL); pthread_cond_destroy(&(pThreadObj->fdReady)); - pthread_mutex_destroy(&(pThreadObj->threadMutex)); + pthread_mutex_destroy(&(pThreadObj->mutex)); } tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); @@ -173,14 +175,146 @@ void taosCleanUpTcpServer(void *handle) { tfree(pServerObj); } -void taosCloseTcpServerConnection(void *chandle) { +static void taosAcceptTcpConnection(void *arg) { + int connFd = -1; + struct sockaddr_in caddr; + int sockFd; + int threadId = 0; + SThreadObj *pThreadObj; + SServerObj *pServerObj; + + pServerObj = (SServerObj *)arg; + + sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); + if (sockFd < 0) return; + + tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + + while (1) { + socklen_t addrlen = sizeof(caddr); + connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen); + + if (connFd < 0) { + tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); + continue; + } + + tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port); + taosKeepTcpAlive(connFd); + + // pick up the thread to handle this connection + pThreadObj = pServerObj->pThreadObj + threadId; + + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); + if (pFdObj) { + pFdObj->ip = caddr.sin_addr.s_addr; + pFdObj->port = caddr.sin_port; + tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, + inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); + } else { + close(connFd); + tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno)); + } + + // pick up next thread for next connection + threadId++; + threadId = threadId % pServerObj->numOfThreads; + } +} + +void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { + SThreadObj *pThreadObj; + pthread_attr_t thattr; + + pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj)); + memset(pThreadObj, 0, sizeof(SThreadObj)); + strcpy(pThreadObj->label, label); + strcpy(pThreadObj->ipstr, ip); + pThreadObj->shandle = shandle; + + if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { + tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); + return NULL; + } + + if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { + tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); + return NULL; + } + + pThreadObj->pollFd = epoll_create(10); // size does not matter + if (pThreadObj->pollFd < 0) { + tError("%s failed to create TCP client epoll", label); + return NULL; + } + + pThreadObj->processData = fp; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); + pthread_attr_destroy(&thattr); + if (code != 0) { + tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); + return NULL; + } + + tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port); + + return pThreadObj; +} + +void taosCleanUpTcpClient(void *chandle) { + SThreadObj *pThreadObj = chandle; + if (pThreadObj == NULL) return; + + while (pThreadObj->pHead) { + taosFreeFdObj(pThreadObj->pHead); + pThreadObj->pHead = pThreadObj->pHead->next; + } + + close(pThreadObj->pollFd); + + pthread_cancel(pThreadObj->thread); + pthread_join(pThreadObj->thread, NULL); + + tTrace (":%s, all connections are cleaned up", pThreadObj->label); + + tfree(pThreadObj); +} + +void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { + SThreadObj * pThreadObj = shandle; + struct in_addr destIp; + + int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr); + if (fd <= 0) return NULL; + + inet_aton(ip, &destIp); + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); + + if (pFdObj) { + pFdObj->thandle = thandle; + pFdObj->port = port; + pFdObj->ip = destIp.s_addr; + tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d", + pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds); + } else { + close(fd); + tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); + } + + return pFdObj; +} + +void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; if (pFdObj == NULL) return; - taosCleanUpFdObj(pFdObj); + taosFreeFdObj(pFdObj); } -int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { +int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { SFdObj *pFdObj = chandle; if (chandle == NULL) return -1; @@ -188,8 +322,6 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void return (int)send(pFdObj->fd, data, (size_t)len, 0); } -#define maxEvents 10 - static void taosReportBrokenLink(SFdObj *pFdObj) { SThreadObj *pThreadObj = pFdObj->pThreadObj; @@ -209,26 +341,26 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { } } -static void taosProcessTcpData(void *param) { - SThreadObj * pThreadObj; - int i, fdNum; - SFdObj * pFdObj; +#define maxEvents 10 + +static void *taosProcessTcpData(void *param) { + SThreadObj *pThreadObj = param; + SFdObj *pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; - pThreadObj = (SThreadObj *)param; SRpcHead rpcHead; while (1) { - pthread_mutex_lock(&pThreadObj->threadMutex); + pthread_mutex_lock(&pThreadObj->mutex); if (pThreadObj->numOfFds < 1) { - pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->threadMutex); + pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex); } - pthread_mutex_unlock(&pThreadObj->threadMutex); + pthread_mutex_unlock(&pThreadObj->mutex); - fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); + int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); if (fdNum < 0) continue; - for (i = 0; i < fdNum; ++i) { + for (int i = 0; i < fdNum; ++i) { pFdObj = events[i].data.ptr; if (events[i].events & EPOLLERR) { @@ -270,7 +402,7 @@ static void taosProcessTcpData(void *param) { continue; } - // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen); + // tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen); memcpy(msg, &rpcHead, sizeof(SRpcHead)); recvInfo.msg = msg; @@ -283,91 +415,43 @@ static void taosProcessTcpData(void *param) { recvInfo.connType = RPC_CONN_TCP; pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); - if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj); + if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } } + + return NULL; } -static void taosAcceptTcpConnection(void *arg) { - int connFd = -1; - struct sockaddr_in clientAddr; - int sockFd; - int threadId = 0; - SThreadObj * pThreadObj; - SServerObj * pServerObj; - SFdObj * pFdObj; +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) { struct epoll_event event; - pServerObj = (SServerObj *)arg; + SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); + if (pFdObj == NULL) return NULL; - sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); + pFdObj->fd = fd; + pFdObj->pThreadObj = pThreadObj; + pFdObj->signature = pFdObj; - if (sockFd < 0) { - tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); - return; - } else { - tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; + event.data.ptr = pFdObj; + if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + tfree(pFdObj); + return NULL; } - while (1) { - socklen_t addrlen = sizeof(clientAddr); - connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); - - if (connFd < 0) { - tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); - continue; - } - - tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), - htons(clientAddr.sin_port)); - taosKeepTcpAlive(connFd); - - // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; - - pFdObj = (SFdObj *)malloc(sizeof(SFdObj)); - if (pFdObj == NULL) { - tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label); - close(connFd); - continue; - } - - memset(pFdObj, 0, sizeof(SFdObj)); - pFdObj->fd = connFd; - strcpy(pFdObj->ipstr, inet_ntoa(clientAddr.sin_addr)); - pFdObj->ip = clientAddr.sin_addr.s_addr; - pFdObj->port = htons(clientAddr.sin_port); - pFdObj->pThreadObj = pThreadObj; - pFdObj->signature = pFdObj; - - event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; - event.data.ptr = pFdObj; - if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno)); - tfree(pFdObj); - close(connFd); - continue; - } - - // notify the data process, add into the FdObj list - pthread_mutex_lock(&(pThreadObj->threadMutex)); - pFdObj->next = pThreadObj->pHead; - if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; - pThreadObj->pHead = pFdObj; - pThreadObj->numOfFds++; - pthread_cond_signal(&pThreadObj->fdReady); - pthread_mutex_unlock(&(pThreadObj->threadMutex)); - - tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, - pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds); + // notify the data process, add into the FdObj list + pthread_mutex_lock(&(pThreadObj->mutex)); + pFdObj->next = pThreadObj->pHead; + if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; + pThreadObj->pHead = pFdObj; + pThreadObj->numOfFds++; + pthread_cond_signal(&pThreadObj->fdReady); + pthread_mutex_unlock(&(pThreadObj->mutex)); - // pick up next thread for next connection - threadId++; - threadId = threadId % pServerObj->numOfThreads; - } + return pFdObj; } -static void taosCleanUpFdObj(SFdObj *pFdObj) { +static void taosFreeFdObj(SFdObj *pFdObj) { if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; @@ -378,7 +462,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { close(pFdObj->fd); epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - pthread_mutex_lock(&pThreadObj->threadMutex); + pthread_mutex_lock(&pThreadObj->mutex); pThreadObj->numOfFds--; @@ -398,7 +482,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { (pFdObj->next)->prev = pFdObj->prev; } - pthread_mutex_unlock(&pThreadObj->threadMutex); + pthread_mutex_unlock(&pThreadObj->mutex); tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); @@ -406,116 +490,4 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { tfree(pFdObj); } -#if 0 -static void taosAcceptUDConnection(void *arg) { - int connFd = -1; - int sockFd; - int threadId = 0; - SThreadObj * pThreadObj; - SServerObj * pServerObj; - SFdObj * pFdObj; - struct epoll_event event; - - pServerObj = (SServerObj *)arg; - sockFd = taosOpenUDServerSocket(pServerObj->ip, pServerObj->port); - - if (sockFd < 0) { - tError("%s failed to open UD socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); - return; - } else { - tTrace("%s UD server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); - } - - while (1) { - connFd = accept(sockFd, NULL, NULL); - - if (connFd < 0) { - tError("%s UD accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno)); - continue; - } - - // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; - - pFdObj = (SFdObj *)malloc(sizeof(SFdObj)); - if (pFdObj == NULL) { - tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label); - close(connFd); - continue; - } - - memset(pFdObj, 0, sizeof(SFdObj)); - pFdObj->fd = connFd; - pFdObj->pThreadObj = pThreadObj; - - event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; - event.data.ptr = pFdObj; - if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - tError("%s failed to add UD FD for epoll, error:%s", pServerObj->label, strerror(errno)); - tfree(pFdObj); - close(connFd); - continue; - } - - // notify the data process, add into the FdObj list - pthread_mutex_lock(&(pThreadObj->threadMutex)); - pFdObj->next = pThreadObj->pHead; - if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; - pThreadObj->pHead = pFdObj; - pThreadObj->numOfFds++; - pthread_cond_signal(&pThreadObj->fdReady); - pthread_mutex_unlock(&(pThreadObj->threadMutex)); - - tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId, - pThreadObj->numOfFds); - - // pick up next thread for next connection - threadId++; - threadId = threadId % pServerObj->numOfThreads; - } -} -#endif - -#if 0 -void taosListTcpConnection(void *handle, char *buffer) { - SServerObj *pServerObj; - SThreadObj *pThreadObj; - SFdObj * pFdObj; - int i, numOfFds, numOfConns; - char * msg; - - pServerObj = (SServerObj *)handle; - buffer[0] = 0; - msg = buffer; - numOfConns = 0; - - pThreadObj = pServerObj->pThreadObj; - - for (i = 0; i < pServerObj->numOfThreads; ++i) { - numOfFds = 0; - sprintf(msg, "TCP:%s Thread:%d number of connections:%d\n", pServerObj->label, pThreadObj->threadId, - pThreadObj->numOfFds); - msg = msg + strlen(msg); - pFdObj = pThreadObj->pHead; - while (pFdObj) { - sprintf(msg, " ip:%s port:%hu\n", pFdObj->ipstr, pFdObj->port); - msg = msg + strlen(msg); - numOfFds++; - numOfConns++; - pFdObj = pFdObj->next; - } - - if (numOfFds != pThreadObj->numOfFds) - tError("TCP:%s thread:%d BIG error, numOfFds:%d actual numOfFds:%d", pServerObj->label, pThreadObj->threadId, - pThreadObj->numOfFds, numOfFds); - - pThreadObj++; - } - - sprintf(msg, "TCP:%s total connections:%d\n", pServerObj->label, numOfConns); - - return; -} -#endif -