From 25a4625ef63b7919937093b3b28c1c6da87b92e0 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 22 Feb 2020 08:01:42 +0800 Subject: [PATCH] support both TCP and UDP simultaneously for both server and client --- src/inc/trpc.h | 14 +- src/rpc/inc/rpcClient.h | 2 +- src/rpc/inc/rpcHead.h | 24 +- src/rpc/inc/rpcServer.h | 2 +- src/rpc/inc/rpcUdp.h | 5 +- src/rpc/src/rpcClient.c | 303 ++++++++--------- src/rpc/src/rpcMain.c | 223 +++++++------ src/rpc/src/rpcServer.c | 287 ++++++++-------- src/rpc/src/rpcUdp.c | 720 +++++++++------------------------------- src/rpc/test/rclient.c | 8 +- src/rpc/test/rserver.c | 8 +- 11 files changed, 603 insertions(+), 993 deletions(-) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 100d3a6cac..3388f0c6c7 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -23,18 +23,8 @@ extern "C" { #include #include "taosdef.h" -#define TAOS_CONN_UDPS 0 -#define TAOS_CONN_UDPC 1 -#define TAOS_CONN_TCPS 2 -#define TAOS_CONN_TCPC 3 -#define TAOS_CONN_HTTPS 4 -#define TAOS_CONN_HTTPC 5 - -#define TAOS_SOCKET_TYPE_NAME_TCP "tcp" -#define TAOS_SOCKET_TYPE_NAME_UDP "udp" - -#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS) -#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC) +#define TAOS_CONN_SERVER 0 +#define TAOS_CONN_CLIENT 1 extern int tsRpcHeadSize; diff --git a/src/rpc/inc/rpcClient.h b/src/rpc/inc/rpcClient.h index dc5e9f744a..c87ae79312 100644 --- a/src/rpc/inc/rpcClient.h +++ b/src/rpc/inc/rpcClient.h @@ -26,7 +26,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void taosCleanUpTcpClient(void *chandle); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); void taosCloseTcpClientConnection(void *chandle); -int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index c157e3ba30..09c66d33ae 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -20,12 +20,29 @@ extern "C" { #endif +#define RPC_CONN_UDPS 0 +#define RPC_CONN_UDPC 1 +#define RPC_CONN_TCPS 2 +#define RPC_CONN_TCPC 3 +#define RPC_CONN_TCP 2 + +typedef struct { + void *msg; + int msgLen; + uint32_t ip; + uint16_t port; + int connType; + void *shandle; + void *thandle; + void *chandle; +} SRecvInfo; + #pragma pack(push, 1) typedef struct { char version:4; // RPC version char comp:4; // compression algorithm, 0:no compression 1:lz4 - char tcp:2; // tcp flag + char resflag:2; // reserved bits char spi:3; // security parameter index char encrypt:3; // encrypt algorithm, 0: no encryption uint16_t tranId; // transcation ID @@ -33,12 +50,12 @@ typedef struct { uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list uint32_t destIp; // destination IP address, for NAT scenario - char user[TSDB_UNI_LEN]; + char user[TSDB_UNI_LEN]; // user ID uint16_t port; // for UDP only, port may be changed char empty[1]; // reserved uint8_t msgType; // message type int32_t msgLen; // message length including the header iteslf - int32_t code; + int32_t code; // code in response message uint8_t content[0]; // message body starts from here } SRpcHead; @@ -54,6 +71,7 @@ typedef struct { #pragma pack(pop) + #ifdef __cplusplus } #endif diff --git a/src/rpc/inc/rpcServer.h b/src/rpc/inc/rpcServer.h index eccbd7271a..6b238733a4 100644 --- a/src/rpc/inc/rpcServer.h +++ b/src/rpc/inc/rpcServer.h @@ -25,7 +25,7 @@ extern "C" { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void taosCleanUpTcpServer(void *param); void taosCloseTcpServerConnection(void *param); -int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); +int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcUdp.h b/src/rpc/inc/rpcUdp.h index 03498ac69d..a84f7c4a49 100644 --- a/src/rpc/inc/rpcUdp.h +++ b/src/rpc/inc/rpcUdp.h @@ -22,10 +22,9 @@ extern "C" { #include "taosdef.h" -void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); -void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); +void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void taosCleanUpUdpConnection(void *handle); -int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle); +int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle); void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); void taosFreeMsgHdr(void *hdr); diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index 8f6f1f9c9b..b616dae82f 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -45,16 +45,143 @@ typedef struct _tcp_client { int numOfFds; char label[12]; char ipstr[20]; - void * shandle; // handle passed by upper layer during server initialization - void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle, - void *chandle); - // char buffer[128000]; + void *shandle; // handle passed by upper layer during server initialization + void *(*processData)(SRecvInfo *pRecv); } STcpClient; #define maxTcpEvents 100 +static void taosCleanUpTcpFdObj(STcpFd *pFdObj); +static void *taosReadTcpData(void *param); + +void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { + STcpClient *pTcp; + pthread_attr_t thattr; + + pTcp = (STcpClient *)malloc(sizeof(STcpClient)); + memset(pTcp, 0, sizeof(STcpClient)); + strcpy(pTcp->label, label); + strcpy(pTcp->ipstr, ip); + pTcp->shandle = shandle; + + if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) { + tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno)); + return NULL; + } + + if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) { + tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); + return NULL; + } + + pTcp->pollFd = epoll_create(10); // size does not matter + if (pTcp->pollFd < 0) { + tError("%s failed to create TCP epoll", label); + return NULL; + } + + pTcp->processData = fp; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) { + tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); + return NULL; + } + + tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port); + + return pTcp; +} + +void taosCleanUpTcpClient(void *chandle) { + STcpClient *pTcp = (STcpClient *)chandle; + if (pTcp == NULL) return; + + while (pTcp->pHead) { + taosCleanUpTcpFdObj(pTcp->pHead); + pTcp->pHead = pTcp->pHead->next; + } + + close(pTcp->pollFd); + + pthread_cancel(pTcp->thread); + pthread_join(pTcp->thread, NULL); + + // tTrace (":%s, all connections are cleaned up", pTcp->label); + + tfree(pTcp); +} + +void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { + STcpClient * pTcp = (STcpClient *)shandle; + STcpFd * pFdObj; + struct epoll_event event; + struct in_addr destIp; + int fd; + + fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr); + if (fd <= 0) return NULL; + + pFdObj = (STcpFd *)malloc(sizeof(STcpFd)); + if (pFdObj == NULL) { + tError("%s no enough resource to allocate TCP FD IDs", pTcp->label); + tclose(fd); + return NULL; + } + + memset(pFdObj, 0, sizeof(STcpFd)); + pFdObj->fd = fd; + strcpy(pFdObj->ipstr, ip); + inet_aton(ip, &destIp); + pFdObj->ip = destIp.s_addr; + pFdObj->port = port; + pFdObj->pTcp = pTcp; + pFdObj->thandle = thandle; + pFdObj->signature = pFdObj; + + event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; + event.data.ptr = pFdObj; + if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno)); + tfree(pFdObj); + tclose(fd); + return NULL; + } + + // notify the data process, add into the FdObj list + pthread_mutex_lock(&(pTcp->mutex)); + pFdObj->next = pTcp->pHead; + if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj; + pTcp->pHead = pFdObj; + pTcp->numOfFds++; + pthread_cond_signal(&pTcp->fdReady); + pthread_mutex_unlock(&(pTcp->mutex)); + + tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds); + + return pFdObj; +} + +void taosCloseTcpClientConnection(void *chandle) { + STcpFd *pFdObj = (STcpFd *)chandle; + + if (pFdObj == NULL) return; + + taosCleanUpTcpFdObj(pFdObj); +} + +int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { + STcpFd *pFdObj = (STcpFd *)chandle; + + if (chandle == NULL) return -1; + + return (int)send(pFdObj->fd, data, (size_t)len, 0); +} + static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { STcpClient *pTcp; + SRecvInfo recvInfo; if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; @@ -75,8 +202,6 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { if (pTcp->numOfFds < 0) tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj); - // remove from the FdObject list - if (pFdObj->prev) { (pFdObj->prev)->next = pFdObj->next; } else { @@ -89,40 +214,28 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { pthread_mutex_unlock(&pTcp->mutex); - // notify the upper layer to clean the associated context - if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL); + recvInfo.msg = NULL; + recvInfo.msgLen = 0; + recvInfo.ip = 0; + recvInfo.port = 0; + recvInfo.shandle = pTcp->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = NULL; + recvInfo.connType = RPC_CONN_TCP; + if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo); tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds); memset(pFdObj, 0, sizeof(STcpFd)); - tfree(pFdObj); } -void taosCleanUpTcpClient(void *chandle) { - STcpClient *pTcp = (STcpClient *)chandle; - if (pTcp == NULL) return; - - while (pTcp->pHead) { - taosCleanUpTcpFdObj(pTcp->pHead); - pTcp->pHead = pTcp->pHead->next; - } - - close(pTcp->pollFd); - - pthread_cancel(pTcp->thread); - pthread_join(pTcp->thread, NULL); - - // tTrace (":%s, all connections are cleaned up", pTcp->label); - - tfree(pTcp); -} - static void *taosReadTcpData(void *param) { - STcpClient * pTcp = (STcpClient *)param; + STcpClient *pTcp = (STcpClient *)param; int i, fdNum; - STcpFd * pFdObj; + STcpFd *pFdObj; struct epoll_event events[maxTcpEvents]; + SRecvInfo recvInfo; while (1) { pthread_mutex_lock(&pTcp->mutex); @@ -186,8 +299,16 @@ static void *taosReadTcpData(void *param) { continue; } - pFdObj->thandle = - (*(pTcp->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port, pTcp->shandle, pFdObj->thandle, pFdObj); + recvInfo.msg = buffer; + recvInfo.msgLen = dataLen; + recvInfo.ip = pFdObj->ip; + recvInfo.port = pFdObj->port; + recvInfo.shandle = pTcp->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = pFdObj; + recvInfo.connType = RPC_CONN_TCP; + + pFdObj->thandle = (*(pTcp->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj); } @@ -196,122 +317,4 @@ static void *taosReadTcpData(void *param) { return NULL; } -void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { - STcpClient * pTcp; - pthread_attr_t thattr; - - pTcp = (STcpClient *)malloc(sizeof(STcpClient)); - memset(pTcp, 0, sizeof(STcpClient)); - strcpy(pTcp->label, label); - strcpy(pTcp->ipstr, ip); - pTcp->shandle = shandle; - - if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) { - tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno)); - return NULL; - } - - if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); - return NULL; - } - - pTcp->pollFd = epoll_create(10); // size does not matter - if (pTcp->pollFd < 0) { - tError("%s failed to create TCP epoll", label); - return NULL; - } - - pTcp->processData = fp; - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) { - tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); - return NULL; - } - - tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port); - - return pTcp; -} - -void taosCloseTcpClientConnection(void *chandle) { - STcpFd *pFdObj = (STcpFd *)chandle; - - if (pFdObj == NULL) return; - - taosCleanUpTcpFdObj(pFdObj); -} - -void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { - STcpClient * pTcp = (STcpClient *)shandle; - STcpFd * pFdObj; - struct epoll_event event; - struct in_addr destIp; - int fd; - - /* - if ( (strcmp(ip, "127.0.0.1") == 0 ) || (strcmp(ip, "localhost") == 0 ) ) { - fd = taosOpenUDClientSocket(ip, port); - } else { - fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr); - } - */ - - fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr); - - if (fd <= 0) return NULL; - - pFdObj = (STcpFd *)malloc(sizeof(STcpFd)); - if (pFdObj == NULL) { - tError("%s no enough resource to allocate TCP FD IDs", pTcp->label); - tclose(fd); - return NULL; - } - - memset(pFdObj, 0, sizeof(STcpFd)); - pFdObj->fd = fd; - strcpy(pFdObj->ipstr, ip); - inet_aton(ip, &destIp); - pFdObj->ip = destIp.s_addr; - pFdObj->port = port; - pFdObj->pTcp = pTcp; - pFdObj->thandle = thandle; - pFdObj->signature = pFdObj; - - event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; - event.data.ptr = pFdObj; - if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { - tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno)); - tfree(pFdObj); - tclose(fd); - return NULL; - } - - // notify the data process, add into the FdObj list - pthread_mutex_lock(&(pTcp->mutex)); - - pFdObj->next = pTcp->pHead; - - if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj; - - pTcp->pHead = pFdObj; - - pTcp->numOfFds++; - pthread_cond_signal(&pTcp->fdReady); - - pthread_mutex_unlock(&(pTcp->mutex)); - - tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds); - - return pFdObj; -} - -int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) { - STcpFd *pFdObj = (STcpFd *)chandle; - - if (chandle == NULL) return -1; - return (int)send(pFdObj->fd, data, (size_t)len, 0); -} diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 1b250a0f3f..a5e805791e 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -62,7 +62,8 @@ typedef struct { void *idPool; // handle to ID pool void *tmrCtrl; // handle to timer void *hash; // handle returned by hash utility - void *shandle; // returned handle from lower layer during initialization + void *tcphandle;// returned handle from TCP initialization + void *udphandle;// returned handle from UDP initialization void *pCache; // connection cache pthread_mutex_t mutex; struct _RpcConn *connList; // connection list @@ -79,6 +80,7 @@ typedef struct { int16_t numOfTry; // number of try for different servers int8_t oldIndex; // server IP index passed by app int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -113,6 +115,7 @@ typedef struct _RpcConn { char *pReqMsg; // request message including header int reqMsgLen; // request message length SRpcInfo *pRpc; // the associated SRpcInfo + int connType; // connection type SRpcReqContext *pContext; // request context } SRpcConn; @@ -122,9 +125,16 @@ int tsRpcProgressTime = 10; // milliseocnds int tsRpcMaxRetry; int tsRpcHeadSize; +// server:0 client:1 tcp:2 udp:0 +#define RPC_CONN_UDPS 0 +#define RPC_CONN_UDPC 1 +#define RPC_CONN_TCPS 2 +#define RPC_CONN_TCPC 3 +#define RPC_CONN_TCP 2 + void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { - taosInitUdpServer, - taosInitUdpClient, + taosInitUdpConnection, + taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient }; @@ -136,7 +146,7 @@ void (*taosCleanUpConn[])(void *thandle) = { taosCleanUpTcpClient }; -int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = { +int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { taosSendUdpData, taosSendUdpData, taosSendTcpServerData, @@ -157,19 +167,19 @@ void (*taosCloseConn[])(void *chandle) = { taosCloseTcpClientConnection }; -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); static void rpcCloseConn(void *thandle); -static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); +static SRpcConn *rpcSetConnToServer(SRpcReqContext *pContext); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); -static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); +static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); +static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); @@ -194,7 +204,8 @@ void *rpcOpen(SRpcInit *pInit) { if(pInit->label) strcpy(pRpc->label, pInit->label); pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; - pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; + // pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; + pRpc->numOfThreads = 1; if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; @@ -207,9 +218,12 @@ void *rpcOpen(SRpcInit *pInit) { pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, + pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(pRpc->localIp, pRpc->localPort, pRpc->label, + pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); + pRpc->udphandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - if (pRpc->shandle == NULL) { + + if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); rpcClose(pRpc); return NULL; @@ -237,18 +251,20 @@ void *rpcOpen(SRpcInit *pInit) { return NULL; } - pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); - if (pRpc->hash == NULL) { - tError("%s failed to init string hash", pRpc->label); - rpcClose(pRpc); - return NULL; - } - - pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); - if ( pRpc->pCache == NULL ) { - tError("%s failed to init connection cache", pRpc->label); - rpcClose(pRpc); - return NULL; + if (pRpc->connType == TAOS_CONN_SERVER) { + pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); + if (pRpc->hash == NULL) { + tError("%s failed to init string hash", pRpc->label); + rpcClose(pRpc); + return NULL; + } + } else { + pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); + if ( pRpc->pCache == NULL ) { + tError("%s failed to init connection cache", pRpc->label); + rpcClose(pRpc); + return NULL; + } } pthread_mutex_init(&pRpc->mutex, NULL); @@ -261,7 +277,8 @@ void *rpcOpen(SRpcInit *pInit) { void rpcClose(void *param) { SRpcInfo *pRpc = (SRpcInfo *)param; - (*taosCleanUpConn[pRpc->connType])(pRpc->shandle); + (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); + (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList[i].user[0]) { @@ -313,6 +330,16 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in pContext->msgType = type; pContext->oldIndex = pIpSet->index; + pContext->connType = RPC_CONN_UDPC; + if (contLen > 16000) pContext->connType = RPC_CONN_TCPC; + + // connection type is application specific. + // for TDengine, all the query, show commands shall have TCP connection + if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || + type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || + type == TSDB_MSG_TYPE_SHOW ) + pContext->connType = RPC_CONN_TCPC; + rpcSendReqToServer(pRpc, pContext); return; @@ -346,7 +373,6 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pHead->version = 1; pHead->msgType = pConn->inType+1; pHead->spi = 0; - pHead->tcp = 0; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; @@ -388,7 +414,6 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { SRpcConn *pConn = (SRpcConn *)thandle; - SRpcInfo *pRpc = pConn->pRpc; pInfo->clientIp = pConn->peerIp; pInfo->clientPort = pConn->peerPort; @@ -396,7 +421,7 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { strcpy(pInfo->user, pConn->user); } -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) { SRpcConn *pConn; pConn = rpcAllocateClientConn(pRpc); @@ -406,12 +431,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) pConn->peerIp = inet_addr(peerIpStr); pConn->peerPort = peerPort; strcpy(pConn->user, pRpc->user); + pConn->connType = connType; - if (taosOpenConn[pRpc->connType]) { - pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); + if (taosOpenConn[connType]) { + void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; + pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIpstr, pConn->peerPort); if (pConn->chandle) { - tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, - pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->localPort); + tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu connType:%d", pRpc->label, + pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->connType); } else { tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn, pConn->peerIpstr, pConn->peerPort); @@ -433,14 +460,14 @@ static void rpcCloseConn(void *thandle) { if (pConn->user[0]) { pConn->user[0] = 0; - if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); + if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pIdleTimer); - if ( pRpc->connType == TAOS_CONN_UDPS || pRpc->connType == TAOS_CONN_TCPS) { + if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); + sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); taosDeleteStrHash(pRpc->hash, hashstr); rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -540,15 +567,17 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashst return pConn; } -SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { - SRpcConn *pConn; +SRpcConn *rpcSetConnToServer(SRpcReqContext *pContext) { + SRpcConn *pConn; + SRpcInfo *pRpc = pContext->pRpc; + SRpcIpSet *pIpSet = &pContext->ipSet; - pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->user); + pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->index], pIpSet->port, pRpc->user); if ( pConn == NULL ) { char ipstr[20] = {0}; - tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); - pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); - pConn->destIp = ipSet.ip[ipSet.index]; + tinet_ntoa(ipstr, pIpSet->ip[pIpSet->index]); + pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType); + if (pConn) pConn->destIp = pIpSet->ip[pIpSet->index]; } return pConn; @@ -618,7 +647,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { taosTmrStopA(&pConn->pTimer); pConn->retry = 0; - if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) { + if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { pConn->tretry++; tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); @@ -638,13 +667,12 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { - int32_t sid, code = 0; - SRpcConn * pConn = NULL; +static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { + int32_t sid; + SRpcConn *pConn = NULL; char hashstr[40] = {0}; - *ppConn = NULL; - SRpcHead *pHead = (SRpcHead *)data; + SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); pHead->code = htonl(pHead->code); @@ -652,50 +680,54 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); - return TSDB_CODE_INVALID_MSG_TYPE; + terrno = TSDB_CODE_INVALID_MSG_TYPE; return NULL; } - if (dataLen != pHead->msgLen) { + if (pRecv->msgLen != pHead->msgLen) { tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, - taosMsg[pHead->msgType], dataLen, pHead->msgLen); - return TSDB_CODE_INVALID_MSG_LEN; + taosMsg[pHead->msgType], pRecv->msgLen, pHead->msgLen); + terrno = TSDB_CODE_INVALID_MSG_LEN; return NULL; } if (sid < 0 || sid >= pRpc->sessions) { tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions, taosMsg[pHead->msgType]); - return TSDB_CODE_INVALID_SESSION_ID; + terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL; } - if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId); + if (sid == 0) sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType); pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr); - if (pConn == NULL ) return terrno; + if (pConn == NULL) return NULL; - *ppConn = pConn; sid = pConn->sid; - if (pHead->uid) pConn->peerUid = pHead->uid; - - if (pHead->tcp) { - tTrace("%s %p, content will be transfered via TCP", pRpc->label, pConn); - if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); - return TSDB_CODE_ALREADY_PROCESSED; + pConn->chandle = pRecv->chandle; + if (pConn->peerIp != pRecv->ip) { + pConn->peerIp = pRecv->ip; + char ipstr[20] = {0}; + tinet_ntoa(ipstr, pRecv->ip); + strcpy(pConn->peerIpstr, ipstr); } + + if (pRecv->port) pConn->peerPort = pRecv->port; + if (pHead->port) pConn->peerPort = pHead->port; + if (pHead->uid) pConn->peerUid = pHead->uid; - code = rpcCheckAuthentication(pConn, (char *)pHead, dataLen); - if ( code != 0 ) return code; + terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen); + if (terrno != 0) return pConn; if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { // decrypt here } if ( rpcIsReq(pHead->msgType) ) { - code = rpcProcessReqHead(pConn, pHead); + terrno = rpcProcessReqHead(pConn, pHead); + pConn->connType = pRecv->connType; } else { - code = rpcProcessRspHead(pConn, pHead); + terrno = rpcProcessRspHead(pConn, pHead); } - return code; + return pConn; } static void rpcProcessBrokenLink(SRpcConn *pConn) { @@ -713,45 +745,31 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { rpcCloseConn(pConn); } -static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { - SRpcHead *pHead = (SRpcHead *)msg; - SRpcInfo *pRpc = (SRpcInfo *)shandle; - SRpcConn *pConn = (SRpcConn *)thandle; +static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { + SRpcHead *pHead = (SRpcHead *)pRecv->msg; + SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; + SRpcConn *pConn = (SRpcConn *)pRecv->thandle; int32_t code = 0; - tDump(msg, msgLen); + tDump(pRecv->msg, pRecv->msgLen); + + // underlying UDP layer does not know it is server or client + pRecv->connType = pRecv->connType | pRpc->connType; - if (ip==0 && pConn) { + if (pRecv->ip==0 && pConn) { rpcProcessBrokenLink(pConn); - tfree(msg); + tfree(pRecv->msg); return NULL; } pthread_mutex_lock(&pRpc->mutex); - - code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip); - - if (pConn) { - // update connection info - pConn->chandle = chandle; - if (pConn->peerIp != ip) { - pConn->peerIp = ip; - char ipstr[20] = {0}; - tinet_ntoa(ipstr, ip); - strcpy(pConn->peerIpstr, ipstr); - } - - if (port) pConn->peerPort = port; - if (pHead->port) // port maybe changed by the peer - pConn->peerPort = pHead->port; - } - + pConn = rpcProcessHead(pRpc, pRecv); pthread_mutex_unlock(&pRpc->mutex); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, - msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, code, + pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } if (pConn && pRpc->idleTime) { @@ -761,7 +779,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { - rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle); + rpcSendErrorMsgToPeer(pRecv, code); tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); } } else { // parsing OK @@ -769,7 +787,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t } } - if ( code != 0 ) free (msg); + if ( code != 0 ) free (pRecv->msg); return pConn; } @@ -816,7 +834,6 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead->version = 1; pHead->msgType = pConn->inType+1; pHead->spi = 0; - pHead->tcp = 0; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; @@ -828,19 +845,18 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { rpcSendMsgToPeer(pConn, msg, 0); } -static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { +static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { SRpcHead *pRecvHead, *pReplyHead; char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; - uint32_t timeStamp; - int msgLen; + uint32_t timeStamp; + int msgLen; - pRecvHead = (SRpcHead *)pMsg; + pRecvHead = (SRpcHead *)pRecv->msg; pReplyHead = (SRpcHead *)msg; memset(msg, 0, sizeof(SRpcHead)); pReplyHead->version = pRecvHead->version; pReplyHead->msgType = (char)(pRecvHead->msgType + 1); - pReplyHead->tcp = 0; pReplyHead->spi = 0; pReplyHead->encrypt = 0; pReplyHead->tranId = pRecvHead->tranId; @@ -860,7 +876,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint } pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - (*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle); + (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); return; } @@ -872,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { char msgType = pContext->msgType; pContext->numOfTry++; - SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet); + SRpcConn *pConn = rpcSetConnToServer(pContext); if (pConn == NULL) { pContext->code = terrno; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); @@ -884,7 +900,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { // set the message header pHead->version = 1; pHead->msgType = msgType; - pHead->tcp = 0; pHead->encrypt = 0; pConn->tranId++; if ( pConn->tranId == 0 ) pConn->tranId++; @@ -928,7 +943,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } - writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle); + writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index c11a803f1b..010a9ae69b 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -46,10 +46,8 @@ typedef struct _thread_obj { int numOfFds; int threadId; char label[12]; - // char buffer[128000]; // buffer to receive data - void *shandle; // handle passed by upper layer during server initialization - void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle, - void *chandle); + void *shandle; // handle passed by upper layer during server initialization + void *(*processData)(SRecvInfo *pPacket); } SThreadObj; typedef struct { @@ -62,59 +60,81 @@ typedef struct { pthread_t thread; } SServerObj; -static void taosCleanUpFdObj(SFdObj *pFdObj) { - SThreadObj *pThreadObj; +static void taosCleanUpFdObj(SFdObj *pFdObj); +static void taosProcessTcpData(void *param); +static void taosAcceptTcpConnection(void *arg); - if (pFdObj == NULL) return; - if (pFdObj->signature != pFdObj) return; +void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { + int i; + SServerObj *pServerObj; + pthread_attr_t thattr; + SThreadObj *pThreadObj; - pThreadObj = pFdObj->pThreadObj; - if (pThreadObj == NULL) { - tError("FdObj double clean up!!!"); - return; + pServerObj = (SServerObj *)malloc(sizeof(SServerObj)); + strcpy(pServerObj->ip, ip); + pServerObj->port = port; + strcpy(pServerObj->label, label); + pServerObj->numOfThreads = numOfThreads; + + pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads); + if (pServerObj->pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + return NULL; } + memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); - epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - close(pFdObj->fd); + pThreadObj = pServerObj->pThreadObj; + for (i = 0; i < numOfThreads; ++i) { + pThreadObj->processData = fp; + strcpy(pThreadObj->label, label); + pThreadObj->shandle = shandle; - pthread_mutex_lock(&pThreadObj->threadMutex); + if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) { + tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno)); + return NULL; + } - pThreadObj->numOfFds--; + if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { + tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); + return NULL; + } - if (pThreadObj->numOfFds < 0) - tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId); + pThreadObj->pollFd = epoll_create(10); // size does not matter + if (pThreadObj->pollFd < 0) { + tError("%s failed to create TCP epoll", label); + return NULL; + } - // remove from the FdObject list + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { + tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); + return NULL; + } - if (pFdObj->prev) { - (pFdObj->prev)->next = pFdObj->next; - } else { - pThreadObj->pHead = pFdObj->next; + pThreadObj->threadId = i; + pThreadObj++; } - if (pFdObj->next) { - (pFdObj->next)->prev = pFdObj->prev; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { + tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); + return NULL; } - pthread_mutex_unlock(&pThreadObj->threadMutex); - - // notify the upper layer, so it will clean the associated context - if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL); - - tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, - pFdObj, pThreadObj->numOfFds); - - memset(pFdObj, 0, sizeof(SFdObj)); - - tfree(pFdObj); -} - -void taosCloseTcpServerConnection(void *chandle) { - SFdObj *pFdObj = (SFdObj *)chandle; - - if (pFdObj == NULL) return; + /* + if ( pthread_create(&(pServerObj->thread), &thattr, + (void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) { + tError("%s failed to create UD accept thread, reason:%s", label, + strerror(errno)); + return NULL; + } + */ + pthread_attr_destroy(&thattr); + tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); - taosCleanUpFdObj(pFdObj); + return (void *)pServerObj; } void taosCleanUpTcpServer(void *handle) { @@ -148,6 +168,22 @@ void taosCleanUpTcpServer(void *handle) { tfree(pServerObj); } +void taosCloseTcpServerConnection(void *chandle) { + SFdObj *pFdObj = (SFdObj *)chandle; + + if (pFdObj == NULL) return; + + taosCleanUpFdObj(pFdObj); +} + +int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { + SFdObj *pFdObj = (SFdObj *)chandle; + + if (chandle == NULL) return -1; + + return (int)send(pFdObj->fd, data, (size_t)len, 0); +} + #define maxEvents 10 static void taosProcessTcpData(void *param) { @@ -155,7 +191,7 @@ static void taosProcessTcpData(void *param) { int i, fdNum; SFdObj * pFdObj; struct epoll_event events[maxEvents]; - + SRecvInfo recvInfo; pThreadObj = (SThreadObj *)param; while (1) { @@ -209,15 +245,22 @@ static void taosProcessTcpData(void *param) { continue; } - pFdObj->thandle = (*(pThreadObj->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port, - pThreadObj->shandle, pFdObj->thandle, pFdObj); + recvInfo.msg = buffer; + recvInfo.msgLen = dataLen; + recvInfo.ip = pFdObj->ip; + recvInfo.port = pFdObj->port; + recvInfo.shandle = pThreadObj->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = pFdObj; + recvInfo.connType = RPC_CONN_TCP; + pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj); } } } -void taosAcceptTcpConnection(void *arg) { +static void taosAcceptTcpConnection(void *arg) { int connFd = -1; struct sockaddr_in clientAddr; int sockFd; @@ -280,16 +323,11 @@ void taosAcceptTcpConnection(void *arg) { // notify the data process, add into the FdObj list pthread_mutex_lock(&(pThreadObj->threadMutex)); - pFdObj->next = pThreadObj->pHead; - if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; - pThreadObj->pHead = pFdObj; - pThreadObj->numOfFds++; pthread_cond_signal(&pThreadObj->fdReady); - pthread_mutex_unlock(&(pThreadObj->threadMutex)); tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, @@ -301,7 +339,65 @@ void taosAcceptTcpConnection(void *arg) { } } -void taosAcceptUDConnection(void *arg) { +static void taosCleanUpFdObj(SFdObj *pFdObj) { + SThreadObj *pThreadObj; + + if (pFdObj == NULL) return; + if (pFdObj->signature != pFdObj) return; + + pThreadObj = pFdObj->pThreadObj; + if (pThreadObj == NULL) { + tError("FdObj double clean up!!!"); + return; + } + + epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); + close(pFdObj->fd); + + pthread_mutex_lock(&pThreadObj->threadMutex); + + pThreadObj->numOfFds--; + + if (pThreadObj->numOfFds < 0) + tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId); + + // remove from the FdObject list + + if (pFdObj->prev) { + (pFdObj->prev)->next = pFdObj->next; + } else { + pThreadObj->pHead = pFdObj->next; + } + + if (pFdObj->next) { + (pFdObj->next)->prev = pFdObj->prev; + } + + pthread_mutex_unlock(&pThreadObj->threadMutex); + + // notify the upper layer, so it will clean the associated context + SRecvInfo recvInfo; + recvInfo.msg = NULL; + recvInfo.msgLen = 0; + recvInfo.ip = 0; + recvInfo.port = 0; + recvInfo.shandle = pThreadObj->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = NULL; + recvInfo.connType = RPC_CONN_TCP; + + if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo); + + tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, + pFdObj, pThreadObj->numOfFds); + + memset(pFdObj, 0, sizeof(SFdObj)); + + tfree(pFdObj); +} + +#if 0 +static void taosAcceptUDConnection(void *arg) { int connFd = -1; int sockFd; int threadId = 0; @@ -353,16 +449,11 @@ void taosAcceptUDConnection(void *arg) { // notify the data process, add into the FdObj list pthread_mutex_lock(&(pThreadObj->threadMutex)); - pFdObj->next = pThreadObj->pHead; - if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; - pThreadObj->pHead = pFdObj; - pThreadObj->numOfFds++; pthread_cond_signal(&pThreadObj->fdReady); - pthread_mutex_unlock(&(pThreadObj->threadMutex)); tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId, @@ -373,79 +464,7 @@ void taosAcceptUDConnection(void *arg) { threadId = threadId % pServerObj->numOfThreads; } } - -void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - int i; - SServerObj * pServerObj; - pthread_attr_t thattr; - SThreadObj * pThreadObj; - - pServerObj = (SServerObj *)malloc(sizeof(SServerObj)); - strcpy(pServerObj->ip, ip); - pServerObj->port = port; - strcpy(pServerObj->label, label); - pServerObj->numOfThreads = numOfThreads; - - pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads); - if (pServerObj->pThreadObj == NULL) { - tError("TCP:%s no enough memory", label); - return NULL; - } - memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); - - pThreadObj = pServerObj->pThreadObj; - for (i = 0; i < numOfThreads; ++i) { - pThreadObj->processData = fp; - strcpy(pThreadObj->label, label); - pThreadObj->shandle = shandle; - - if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) { - tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno)); - return NULL; - } - - if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); - return NULL; - } - - pThreadObj->pollFd = epoll_create(10); // size does not matter - if (pThreadObj->pollFd < 0) { - tError("%s failed to create TCP epoll", label); - return NULL; - } - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { - tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); - return NULL; - } - - pThreadObj->threadId = i; - pThreadObj++; - } - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { - tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); - return NULL; - } - - /* - if ( pthread_create(&(pServerObj->thread), &thattr, - (void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) { - tError("%s failed to create UD accept thread, reason:%s", label, - strerror(errno)); - return NULL; - } - */ - pthread_attr_destroy(&thattr); - tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); - - return (void *)pServerObj; -} +#endif #if 0 void taosListTcpConnection(void *handle, char *buffer) { @@ -489,10 +508,4 @@ void taosListTcpConnection(void *handle, char *buffer) { } #endif -int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) { - SFdObj *pFdObj = (SFdObj *)chandle; - if (chandle == NULL) return -1; - - return (int)send(pFdObj->fd, data, (size_t)len, 0); -} diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 46cb768995..245a98b2c7 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -44,9 +44,8 @@ typedef struct { void * hash; void * shandle; // handle passed by upper layer during server initialization void * pSet; - void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle, - void *chandle); - char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data + void *(*processData)(SRecvInfo *pRecv); + char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data } SUdpConn; typedef struct { @@ -58,10 +57,8 @@ typedef struct { int threads; char label[12]; void * tmrCtrl; - pthread_t tcpThread; - int tcpFd; - void *(*fp)(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); - SUdpConn udpConn[]; + void *(*fp)(SRecvInfo *pPacket); + SUdpConn udpConn[]; } SUdpConnSet; typedef struct { @@ -76,420 +73,9 @@ typedef struct { int emptyNum; } SUdpBuf; -typedef struct { - uint64_t handle; - uint16_t port; - int32_t msgLen; -} SPacketInfo; - -typedef struct { - int fd; - uint32_t ip; - uint16_t port; - SUdpConnSet *pSet; -} STransfer; - -typedef struct { - void * pTimer; - SUdpConnSet *pSet; - SUdpConn * pConn; - int dataLen; - uint32_t ip; - uint16_t port; - char data[96]; -} SMonitor; - -typedef struct { - uint64_t handle; - uint64_t hash; -} SHandleViaTcp; - -void taosFreeMsgHdr(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - free(msgHdr->msg_iov); -} - -int taosMsgHdrSize(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - return (int)msgHdr->msg_iovlen; -} - -void taosSendMsgHdr(void *hdr, int fd) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - sendmsg(fd, msgHdr, 0); - msgHdr->msg_iovlen = 0; -} - -void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) { - struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr)); - memset(msgHdr, 0, sizeof(struct msghdr)); - *hdr = msgHdr; - struct sockaddr_in *destAdd = (struct sockaddr_in *)dest; - - msgHdr->msg_name = destAdd; - msgHdr->msg_namelen = sizeof(struct sockaddr_in); - int size = (int)sizeof(struct iovec) * maxPkts; - msgHdr->msg_iov = (struct iovec *)malloc((size_t)size); - memset(msgHdr->msg_iov, 0, (size_t)size); -} - -void taosSetMsgHdrData(void *hdr, char *data, int dataLen) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen; - msgHdr->msg_iovlen++; -} -bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) { - return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle); -} - -void taosInitHandleViaTcp(SHandleViaTcp *handleViaTcp, uint64_t handle) { - handleViaTcp->handle = handle; - handleViaTcp->hash = taosHashUInt64(handleViaTcp->handle); -} - -void taosProcessMonitorTimer(void *param, void *tmrId) { - SMonitor *pMonitor = (SMonitor *)param; - if (pMonitor->pTimer != tmrId) return; - - SUdpConnSet *pSet = pMonitor->pSet; - pMonitor->pTimer = NULL; - - if (pSet) { - char *data = malloc((size_t)pMonitor->dataLen); - memcpy(data, pMonitor->data, (size_t)pMonitor->dataLen); - - tTrace("%s monitor timer is expired, update the link status", pSet->label); - (*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL); - taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - } else { - taosTmrStopA(&pMonitor->pTimer); - free(pMonitor); - } -} - -void *taosReadTcpData(void *argv) { - SMonitor *pMonitor = (SMonitor *)argv; - SRpcHead *pHead = (SRpcHead *)pMonitor->data; - SPacketInfo *pInfo = (SPacketInfo *)pHead->content; - SUdpConnSet *pSet = pMonitor->pSet; - int retLen, fd; - char ipstr[64]; - - pInfo->msgLen = (int32_t)htonl((uint32_t)pInfo->msgLen); - - tinet_ntoa(ipstr, pMonitor->ip); - tTrace("%s receive packet via TCP:%s:%hu, msgLen:%d, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label, - ipstr, pInfo->port, pInfo->msgLen, pInfo->handle, pHead->sourceId, pHead->destId, pHead->tranId); - - fd = taosOpenTcpClientSocket(ipstr, (int16_t)pInfo->port, tsLocalIp); - if (fd < 0) { - tError("%s failed to open TCP client socket ip:%s:%hu", pSet->label, ipstr, pInfo->port); - pMonitor->pSet = NULL; - return NULL; - } - - SHandleViaTcp handleViaTcp; - taosInitHandleViaTcp(&handleViaTcp, pInfo->handle); - retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp)); - - if (retLen != (int)sizeof(SHandleViaTcp)) { - tError("%s failed to send handle:0x%x to server, retLen:%d", pSet->label, pInfo->handle, retLen); - pMonitor->pSet = NULL; - } else { - tTrace("%s handle:0x%x is sent to server", pSet->label, pInfo->handle); - char *buffer = malloc((size_t)pInfo->msgLen); - if (NULL == buffer) { - tError("%s failed to malloc(size:%d) for recv server data", pSet->label, pInfo->msgLen); - retLen = 0; - //taosCloseTcpSocket(fd); - //pMonitor->pSet = NULL; - //return NULL; - } else { - retLen = taosReadMsg(fd, buffer, pInfo->msgLen); - } - - pMonitor->pSet = NULL; - - if (retLen != pInfo->msgLen) { - tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen); - tfree(buffer); - } else { - (*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, pInfo->port, pSet->shandle, NULL, pMonitor->pConn); - } - } - - taosCloseTcpSocket(fd); - - return NULL; -} - -int taosReceivePacketViaTcp(uint32_t ip, SRpcHead *pHead, SUdpConn *pConn) { - SUdpConnSet * pSet = pConn->pSet; - SPacketInfo * pInfo = (SPacketInfo *)pHead->content; - int code = 0; - pthread_attr_t thattr; - pthread_t thread; - - tTrace("%s receive packet via TCP, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label, pInfo->handle, - pHead->sourceId, pHead->destId, pHead->tranId); - - SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); - pMonitor->dataLen = sizeof(SRpcHead) + sizeof(SPacketInfo); - memcpy(pMonitor->data, pHead, (size_t)pMonitor->dataLen); - pMonitor->pSet = pSet; - pMonitor->ip = ip; - pMonitor->port = pInfo->port; - pMonitor->pConn = pConn; - taosTmrReset(taosProcessMonitorTimer, 0, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor); - if (code < 0) { - tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno)); - pMonitor->pSet = NULL; - } - - pthread_attr_destroy(&thattr); - return code; -} - -void *taosRecvUdpData(void *param) { - struct sockaddr_in sourceAdd; - unsigned int addLen, dataLen; - SUdpConn * pConn = (SUdpConn *)param; - uint16_t port; - int minSize = sizeof(SRpcHead); - - memset(&sourceAdd, 0, sizeof(sourceAdd)); - addLen = sizeof(sourceAdd); - tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index); - - while (1) { - dataLen = - (uint32_t)recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen); - tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), - dataLen); - - if (dataLen < sizeof(SRpcHead)) { - tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); - continue; - } - - port = ntohs(sourceAdd.sin_port); - - int processedLen = 0, leftLen = 0; - int msgLen = 0; - int count = 0; - char *msg = pConn->buffer; - while (processedLen < (int)dataLen) { - leftLen = dataLen - processedLen; - SRpcHead *pHead = (SRpcHead *)msg; - msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) { - tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen, - processedLen, count, msgLen); - break; - } - - if (pHead->tcp == 1) { - taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (SRpcHead *)msg, pConn); - } else { - char *data = malloc((size_t)msgLen); - memcpy(data, msg, (size_t)msgLen); - (*(pConn->processData))(data, msgLen, sourceAdd.sin_addr.s_addr, port, pConn->shandle, NULL, pConn); - } - - processedLen += msgLen; - msg += msgLen; - count++; - } - - // tTrace("%s %d UDP packets are received together", pConn->label, count); - } - - return NULL; -} - -void *taosTransferDataViaTcp(void *argv) { - STransfer * pTransfer = (STransfer *)argv; - int connFd = pTransfer->fd; - int msgLen, retLen, leftLen; - uint64_t handle; - SRpcHead *pHead = NULL, head; - SUdpConnSet *pSet = pTransfer->pSet; - - SHandleViaTcp handleViaTcp; - retLen = taosReadMsg(connFd, &handleViaTcp, sizeof(SHandleViaTcp)); - - if (retLen != sizeof(SHandleViaTcp)) { - tError("%s UDP server failed to read handle, retLen:%d", pSet->label, retLen); - taosCloseSocket(connFd); - free(pTransfer); - return NULL; - } - - if (!taosCheckHandleViaTcpValid(&handleViaTcp)) { - tError("%s UDP server read handle via tcp invalid, handle:%" PRIu64 ", hash:%" PRIu64, pSet->label, handleViaTcp.handle, - handleViaTcp.hash); - taosCloseSocket(connFd); - free(pTransfer); - return NULL; - } - - handle = handleViaTcp.handle; - - if (handle == 0) { - // receive a packet from client - tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port); - retLen = taosReadMsg(connFd, &head, sizeof(SRpcHead)); - if (retLen != (int)sizeof(SRpcHead)) { - tError("%s failed to read msg header, retLen:%d", pSet->label, retLen); - } else { - SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); - if (NULL == pMonitor) { - tError("%s malloc failed by TransferViaTcp from client", pSet->label); - taosCloseSocket(connFd); - free(pTransfer); - return NULL; - } - pMonitor->dataLen = sizeof(SRpcHead); - memcpy(pMonitor->data, &head, (size_t)pMonitor->dataLen); - ((SRpcHead *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(SRpcHead)); - ((SRpcHead *)pMonitor->data)->tcp = 1; - pMonitor->ip = pTransfer->ip; - pMonitor->port = head.port; - pMonitor->pSet = pSet; - taosTmrReset(taosProcessMonitorTimer, 0, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - - msgLen = (int32_t)htonl((uint32_t)head.msgLen); - char *buffer = malloc((size_t)msgLen); - if (NULL == buffer) { - tError("%s malloc failed for msg by TransferViaTcp", pSet->label); - taosCloseSocket(connFd); - free(pTransfer); - return NULL; - } - - leftLen = msgLen - (int)sizeof(SRpcHead); - retLen = taosReadMsg(connFd, buffer + sizeof(SRpcHead), leftLen); - pMonitor->pSet = NULL; - - if (retLen != leftLen) { - tError("%s failed to read data from client, leftLen:%d retLen:%d, error:%s", pSet->label, leftLen, retLen, - strerror(errno)); - } else { - tTrace("%s data is received from client via TCP from 0x%x:%hu, msgLen:%d", pSet->label, pTransfer->ip, - pTransfer->port, msgLen); - pSet->index = (pSet->index + 1) % pSet->threads; - SUdpConn *pConn = pSet->udpConn + pSet->index; - memcpy(buffer, &head, sizeof(SRpcHead)); - (*pSet->fp)(buffer, msgLen, pTransfer->ip, head.port, pSet->shandle, NULL, pConn); - } - - taosWriteMsg(connFd, &handleViaTcp, sizeof(SHandleViaTcp)); - } - } else { - // send a packet to client - tTrace("%s send packet to client via TCP, handle:0x%x", pSet->label, handle); - pHead = (SRpcHead *)handle; - msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - - if (pHead->tcp != 0 || msgLen < 1024) { - tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHead); - } else { - SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor)); - if (NULL == pMonitor) { - tError("%s malloc failed by TransferViaTcp to client", pSet->label); - taosCloseSocket(connFd); - free(pTransfer); - return NULL; - } - pMonitor->dataLen = sizeof(SRpcHead); - memcpy(pMonitor->data, (void *)handle, (size_t)pMonitor->dataLen); - SRpcHead *pThead = (SRpcHead *)pMonitor->data; - pThead->tcp = 1; - pThead->msgType = (char)(pHead->msgType - 1); - pThead->msgLen = (int32_t)htonl(sizeof(SRpcHead)); - uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id; - pMonitor->ip = pTransfer->ip; - pMonitor->port = pTransfer->port; - pMonitor->pSet = pSet; - taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - - retLen = taosWriteMsg(connFd, (void *)handle, msgLen); - pMonitor->pSet = NULL; - - if (retLen != msgLen) { - tError("%s failed to send data to client, msgLen:%d retLen:%d", pSet->label, msgLen, retLen); - } else { - tTrace("%s data is sent to client successfully via TCP to 0x%x:%hu, size:%d", pSet->label, pTransfer->ip, - pTransfer->port, msgLen); - } - } - } - - // retLen = taosReadMsg(connFd, &handleViaTcp, sizeof(handleViaTcp)); - free(pTransfer); - taosCloseSocket(connFd); - - return NULL; -} - -void *taosUdpTcpConnection(void *argv) { - int connFd = -1; - struct sockaddr_in clientAddr; - pthread_attr_t thattr; - pthread_t thread; - uint32_t sourceIp; - char ipstr[20]; - - SUdpConnSet *pSet = (SUdpConnSet *)argv; - - pSet->tcpFd = taosOpenTcpServerSocket(pSet->ip, pSet->port); - if (pSet->tcpFd < 0) { - tPrint("%s failed to create TCP socket %s:%hu for UDP server, reason:%s", pSet->label, pSet->ip, pSet->port, - strerror(errno)); - taosKillSystem(); - return NULL; - } - - tTrace("%s UDP server is created, ip:%s:%hu", pSet->label, pSet->ip, pSet->port); - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - - while (1) { - if (pSet->tcpFd < 0) break; - socklen_t addrlen = sizeof(clientAddr); - connFd = accept(pSet->tcpFd, (struct sockaddr *)&clientAddr, &addrlen); - - if (connFd < 0) { - tError("%s UDP server TCP accept failure, reason:%s", pSet->label, strerror(errno)); - continue; - } - - sourceIp = clientAddr.sin_addr.s_addr; - tinet_ntoa(ipstr, sourceIp); - tTrace("%s UDP server TCP connection from ip:%s:%u", pSet->label, ipstr, htons(clientAddr.sin_port)); - - STransfer *pTransfer = malloc(sizeof(STransfer)); - pTransfer->fd = connFd; - pTransfer->ip = sourceIp; - pTransfer->port = clientAddr.sin_port; - pTransfer->pSet = pSet; - - if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) { - tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno)); - free(pTransfer); - taosCloseSocket(connFd); - } - } - - pthread_attr_destroy(&thattr); - return NULL; -} +static void *taosRecvUdpData(void *param); +static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); +static void taosProcessUdpBufTimer(void *param, void *tmrId); void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { pthread_attr_t thAttr; @@ -508,7 +94,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pSet->port = port; pSet->shandle = shandle; pSet->fp = fp; - pSet->tcpFd = -1; strcpy(pSet->label, label); // if ( tsUdpDelay ) { @@ -570,39 +155,11 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v return pSet; } -void *taosInitUdpServer(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - SUdpConnSet *pSet; - pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle); - if (pSet == NULL) return NULL; - - pSet->server = 1; - pSet->fp = fp; - - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - - // not support by windows - // pthread_t thread; - // pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet); - pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet); - pthread_attr_destroy(&thattr); - - return pSet; -} - -void *taosInitUdpClient(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - return taosInitUdpConnection(ip, port, label, threads, fp, shandle); -} - void taosCleanUpUdpConnection(void *handle) { SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConn * pConn; if (pSet == NULL) return; - if (pSet->server == 1) { - pthread_cancel(pSet->tcpThread); - } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; @@ -621,8 +178,6 @@ void taosCleanUpUdpConnection(void *handle) { tTrace("chandle:%p is closed", pConn); } - if (pSet->tcpFd >= 0) taosCloseTcpSocket(pSet->tcpFd); - pSet->tcpFd = -1; taosTmrCleanUp(pSet->tmrCtrl); tfree(pSet); } @@ -641,6 +196,148 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por return pConn; } +static void *taosRecvUdpData(void *param) { + struct sockaddr_in sourceAdd; + int dataLen; + unsigned int addLen; + SUdpConn * pConn = (SUdpConn *)param; + uint16_t port; + int minSize = sizeof(SRpcHead); + SRecvInfo recvInfo; + + memset(&sourceAdd, 0, sizeof(sourceAdd)); + addLen = sizeof(sourceAdd); + tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index); + + while (1) { + dataLen = recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen); + tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), + dataLen); + + if (dataLen < sizeof(SRpcHead)) { + tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); + continue; + } + + port = ntohs(sourceAdd.sin_port); + + int processedLen = 0, leftLen = 0; + int msgLen = 0; + int count = 0; + char *msg = pConn->buffer; + while (processedLen < dataLen) { + leftLen = dataLen - processedLen; + SRpcHead *pHead = (SRpcHead *)msg; + msgLen = htonl((uint32_t)pHead->msgLen); + if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) { + tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen, + processedLen, count, msgLen); + break; + } + + char *data = malloc((size_t)msgLen); + memcpy(data, msg, (size_t)msgLen); + recvInfo.msg = data; + recvInfo.msgLen = msgLen; + recvInfo.ip = sourceAdd.sin_addr.s_addr; + recvInfo.port = port; + recvInfo.shandle = pConn->shandle; + recvInfo.thandle = NULL; + recvInfo.chandle = pConn; + recvInfo.connType = 0; + (*(pConn->processData))(&recvInfo); + + processedLen += msgLen; + msg += msgLen; + count++; + } + + // tTrace("%s %d UDP packets are received together", pConn->label, count); + } + + return NULL; +} + +int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { + SUdpConn *pConn = (SUdpConn *)chandle; + SUdpBuf * pBuf; + + if (pConn == NULL || pConn->signature != pConn) return -1; + + if (pConn->hash == NULL) { + struct sockaddr_in destAdd; + memset(&destAdd, 0, sizeof(destAdd)); + destAdd.sin_family = AF_INET; + destAdd.sin_addr.s_addr = ip; + destAdd.sin_port = htons(port); + + int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); + tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr, + port, dataLen, ret, pConn->localPort, chandle); + + return ret; + } + + pthread_mutex_lock(&pConn->mutex); + + pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port); + if (pBuf == NULL) { + pBuf = taosCreateUdpBuf(pConn, ip, port); + rpcAddIpHash(pConn->hash, pBuf, ip, port); + } + + if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) { + taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); + + taosSendMsgHdr(pBuf->msgHdr, pConn->fd); + pBuf->totalLen = 0; + } + + taosSetMsgHdrData(pBuf->msgHdr, data, dataLen); + + pBuf->totalLen += dataLen; + + pthread_mutex_unlock(&pConn->mutex); + + return dataLen; +} + +void taosFreeMsgHdr(void *hdr) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + free(msgHdr->msg_iov); +} + +int taosMsgHdrSize(void *hdr) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + return (int)msgHdr->msg_iovlen; +} + +void taosSendMsgHdr(void *hdr, int fd) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + sendmsg(fd, msgHdr, 0); + msgHdr->msg_iovlen = 0; +} + +void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) { + struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr)); + memset(msgHdr, 0, sizeof(struct msghdr)); + *hdr = msgHdr; + struct sockaddr_in *destAdd = (struct sockaddr_in *)dest; + + msgHdr->msg_name = destAdd; + msgHdr->msg_namelen = sizeof(struct sockaddr_in); + int size = (int)sizeof(struct iovec) * maxPkts; + msgHdr->msg_iov = (struct iovec *)malloc((size_t)size); + memset(msgHdr->msg_iov, 0, (size_t)size); +} + +void taosSetMsgHdrData(void *hdr, char *data, int dataLen) { + struct msghdr *msgHdr = (struct msghdr *)hdr; + msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data; + msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen; + msgHdr->msg_iovlen++; +} + void taosRemoveUdpBuf(SUdpBuf *pBuf) { taosTmrStopA(&pBuf->timer); rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); @@ -679,7 +376,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) { if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); } -SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { +static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf)); memset(pBuf, 0, sizeof(SUdpBuf)); @@ -700,121 +397,4 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { return pBuf; } -int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) { - SUdpConn * pConn = (SUdpConn *)chandle; - SUdpConnSet *pSet = (SUdpConnSet *)pConn->pSet; - int code = -1, retLen, msgLen; - char ipstr[64]; - char buffer[128]; - SRpcHead *pHead; - - if (pSet->server) { - // send from server - - pHead = (SRpcHead *)buffer; - memcpy(pHead, data, sizeof(SRpcHead)); - pHead->tcp = 1; - - SPacketInfo *pInfo = (SPacketInfo *)pHead->content; - pInfo->handle = (uint64_t)data; - pInfo->port = pSet->port; - pInfo->msgLen = pHead->msgLen; - - msgLen = sizeof(SRpcHead) + sizeof(SPacketInfo); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - code = taosSendUdpData(ip, port, buffer, msgLen, chandle); - tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port, - pHead->msgType, htonl((uint32_t)pInfo->msgLen), pInfo->handle); - if (code > 0) code = dataLen; - } else { - // send from client - tTrace("%s data will be sent via TCP from client", pSet->label); - - // send a UDP header first to set up the connection - pHead = (SRpcHead *)buffer; - memcpy(pHead, data, sizeof(SRpcHead)); - - pHead->tcp = 2; - - msgLen = sizeof(SRpcHead); - pHead->msgLen = (int32_t)htonl(msgLen); - code = taosSendUdpData(ip, port, buffer, msgLen, chandle); - - //pHead = (SRpcHead *)data; - - tinet_ntoa(ipstr, ip); - int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp); - if (fd < 0) { - tError("%s failed to open TCP socket to:%s:%hu to send packet", pSet->label, ipstr, pConn->port); - } else { - SHandleViaTcp handleViaTcp; - taosInitHandleViaTcp(&handleViaTcp, 0); - retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp)); - - if (retLen != (int)sizeof(handleViaTcp)) { - tError("%s failed to send handle to server, retLen:%d", pSet->label, retLen); - } else { - retLen = taosWriteMsg(fd, data, dataLen); - if (retLen != dataLen) { - tError("%s failed to send data via TCP, dataLen:%d, retLen:%d, error:%s", pSet->label, dataLen, retLen, - strerror(errno)); - } else { - code = dataLen; - tTrace("%s data is sent via TCP successfully", pSet->label); - } - } - - taosReadMsg(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp)); - - taosCloseTcpSocket(fd); - } - } - - return code; -} - -int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) { - SUdpConn *pConn = (SUdpConn *)chandle; - SUdpBuf * pBuf; - - if (pConn == NULL || pConn->signature != pConn) return -1; - - if (dataLen >= RPC_MAX_UDP_SIZE) return taosSendPacketViaTcp(ip, port, data, dataLen, chandle); - - if (pConn->hash == NULL) { - struct sockaddr_in destAdd; - memset(&destAdd, 0, sizeof(destAdd)); - destAdd.sin_family = AF_INET; - destAdd.sin_addr.s_addr = ip; - destAdd.sin_port = htons(port); - - int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); - tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr, - port, dataLen, ret, pConn->localPort, chandle); - - return ret; - } - - pthread_mutex_lock(&pConn->mutex); - pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port); - if (pBuf == NULL) { - pBuf = taosCreateUdpBuf(pConn, ip, port); - rpcAddIpHash(pConn->hash, pBuf, ip, port); - } - - if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) { - taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); - - taosSendMsgHdr(pBuf->msgHdr, pConn->fd); - pBuf->totalLen = 0; - } - - taosSetMsgHdrData(pBuf->msgHdr, data, dataLen); - - pBuf->totalLen += dataLen; - - pthread_mutex_unlock(&pConn->mutex); - - return dataLen; -} diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 46d9e99c62..66655b34a4 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -85,7 +85,6 @@ int main(int argc, char *argv[]) { int msgSize = 128; int numOfReqs = 0; int appThreads = 1; - char socketType[20] = "udp"; char serverIp[40] = "127.0.0.1"; struct timeval systemTime; int64_t startTime, endTime; @@ -113,9 +112,7 @@ int main(int argc, char *argv[]) { rpcInit.ckey = "key"; for (int i=1; i