提交 704d8da0 编写于 作者: J Jeff Tao

re-organize TCP connection code

上级 5e58bd5c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_server_header_
#define _rpc_server_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif
...@@ -13,20 +13,22 @@ ...@@ -13,20 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _rpc_client_header_ #ifndef _rpc_tcp_header_
#define _rpc_client_header_ #define _rpc_tcp_header_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "taosdef.h" void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tsocket.h"
#include "tutil.h"
#include "rpcClient.h"
#include "rpcHead.h"
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
typedef struct _tcp_fd {
void *signature;
int fd; // TCP socket FD
void * thandle;
uint32_t ip;
char ipstr[20];
uint16_t port;
struct _tcp_client *pTcp;
struct _tcp_fd * prev, *next;
} STcpFd;
typedef struct _tcp_client {
pthread_t thread;
STcpFd * pHead;
pthread_mutex_t mutex;
pthread_cond_t fdReady;
int pollFd;
int numOfFds;
char label[12];
char ipstr[20];
void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pRecv);
} STcpClient;
#define maxTcpEvents 100
static void taosCleanUpTcpFdObj(STcpFd *pFdObj);
static void *taosReadTcpData(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient *pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP client epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
pthread_attr_destroy(&thattr);
if (code != 0) {
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
return pTcp;
}
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
static void taosReportBrokenLink(STcpFd *pFdObj) {
SRecvInfo recvInfo;
STcpClient *pTcp = pFdObj->pTcp;
if (pFdObj->thandle) {
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
(*(pTcp->processData))(&recvInfo);
}
}
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pFdObj->signature = NULL;
STcpClient *pTcp = pFdObj->pTcp;
epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
pthread_mutex_lock(&pTcp->mutex);
pTcp->numOfFds--;
if (pTcp->numOfFds < 0)
tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj);
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
pTcp->pHead = pFdObj->next;
}
if (pFdObj->next) {
(pFdObj->next)->prev = pFdObj->prev;
}
pthread_mutex_unlock(&pTcp->mutex);
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds);
tfree(pFdObj);
}
static void *taosReadTcpData(void *param) {
STcpClient *pTcp = (STcpClient *)param;
int i, fdNum;
STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) {
pthread_mutex_lock(&pTcp->mutex);
if (pTcp->numOfFds < 1) pthread_cond_wait(&pTcp->fdReady, &pTcp->mutex);
pthread_mutex_unlock(&pTcp->mutex);
fdNum = epoll_wait(pTcp->pollFd, events, maxTcpEvents, -1);
if (fdNum < 0) continue;
for (i = 0; i < fdNum; ++i) {
pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) {
tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLHUP) {
tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle);
taosReportBrokenLink(pFdObj);
continue;
}
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen);
taosReportBrokenLink(pFdObj);
continue;
}
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
if (NULL == buffer) {
tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen);
taosReportBrokenLink(pFdObj);
continue;
}
char *msg = buffer + tsRpcOverhead;
int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pTcp->label, pFdObj->thandle, leftLen, retLen);
tfree(buffer);
taosReportBrokenLink(pFdObj);
continue;
}
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pTcp->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj);
}
}
return NULL;
}
...@@ -27,8 +27,7 @@ ...@@ -27,8 +27,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcCache.h" #include "rpcCache.h"
#include "rpcClient.h" #include "rpcTcp.h"
#include "rpcServer.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "trpc.h" #include "trpc.h"
#include "hash.h" #include "hash.h"
...@@ -67,7 +66,7 @@ typedef struct { ...@@ -67,7 +66,7 @@ typedef struct {
void *udphandle;// returned handle from UDP initialization void *udphandle;// returned handle from UDP initialization
void *pCache; // connection cache void *pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct _RpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct { typedef struct {
...@@ -88,7 +87,7 @@ typedef struct { ...@@ -88,7 +87,7 @@ typedef struct {
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
typedef struct _RpcConn { typedef struct SRpcConn {
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
...@@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = { ...@@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = {
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData,
taosSendUdpData, taosSendUdpData,
taosSendTcpServerData, taosSendTcpData,
taosSendTcpClientData taosSendTcpData
}; };
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
...@@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = ...@@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) =
void (*taosCloseConn[])(void *chandle) = { void (*taosCloseConn[])(void *chandle) = {
NULL, NULL,
NULL, NULL,
taosCloseTcpServerConnection, taosCloseTcpConnection,
taosCloseTcpClientConnection taosCloseTcpConnection
}; };
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType);
......
...@@ -18,30 +18,30 @@ ...@@ -18,30 +18,30 @@
#include "tlog.h" #include "tlog.h"
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
#include "rpcServer.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h"
#define TAOS_IPv4ADDR_LEN 16
#ifndef EPOLLWAKEUP #ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29) #define EPOLLWAKEUP (1u << 29)
#endif #endif
typedef struct _fd_obj { typedef struct SFdObj {
void *signature; void *signature;
int fd; // TCP socket FD int fd; // TCP socket FD
void * thandle; // handle from upper layer, like TAOS void *thandle; // handle from upper layer, like TAOS
char ipstr[TAOS_IPv4ADDR_LEN]; uint32_t ip;
unsigned int ip; uint16_t port;
uint16_t port; struct SThreadObj *pThreadObj;
struct _thread_obj *pThreadObj; struct SFdObj *prev;
struct _fd_obj * prev, *next; struct SFdObj *next;
} SFdObj; } SFdObj;
typedef struct _thread_obj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
SFdObj * pHead; SFdObj * pHead;
pthread_mutex_t threadMutex; pthread_mutex_t mutex;
pthread_cond_t fdReady; pthread_cond_t fdReady;
char ipstr[TSDB_IPv4ADDR_LEN];
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
...@@ -51,7 +51,7 @@ typedef struct _thread_obj { ...@@ -51,7 +51,7 @@ typedef struct _thread_obj {
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
char ip[40]; char ip[TSDB_IPv4ADDR_LEN];
uint16_t port; uint16_t port;
char label[12]; char label[12];
int numOfThreads; int numOfThreads;
...@@ -60,13 +60,15 @@ typedef struct { ...@@ -60,13 +60,15 @@ typedef struct {
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
static void taosCleanUpFdObj(SFdObj *pFdObj); static void *taosProcessTcpData(void *param);
static void taosProcessTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void taosAcceptTcpConnection(void *arg); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
strcpy(pServerObj->ip, ip); strcpy(pServerObj->ip, ip);
...@@ -88,7 +90,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, ...@@ -88,7 +90,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
strcpy(pThreadObj->label, label); strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL); code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) { if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
break;; break;;
...@@ -110,7 +112,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, ...@@ -110,7 +112,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
...@@ -144,8 +146,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, ...@@ -144,8 +146,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
} }
void taosCleanUpTcpServer(void *handle) { void taosCleanUpTcpServer(void *handle) {
SThreadObj *pThreadObj;
SServerObj *pServerObj = handle; SServerObj *pServerObj = handle;
SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
...@@ -156,7 +158,7 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -156,7 +158,7 @@ void taosCleanUpTcpServer(void *handle) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
taosCleanUpFdObj(pThreadObj->pHead); taosFreeFdObj(pThreadObj->pHead);
pThreadObj->pHead = pThreadObj->pHead; pThreadObj->pHead = pThreadObj->pHead;
} }
...@@ -164,7 +166,7 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -164,7 +166,7 @@ void taosCleanUpTcpServer(void *handle) {
pthread_cancel(pThreadObj->thread); pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL); pthread_join(pThreadObj->thread, NULL);
pthread_cond_destroy(&(pThreadObj->fdReady)); pthread_cond_destroy(&(pThreadObj->fdReady));
pthread_mutex_destroy(&(pThreadObj->threadMutex)); pthread_mutex_destroy(&(pThreadObj->mutex));
} }
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
...@@ -173,14 +175,146 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -173,14 +175,146 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj); tfree(pServerObj);
} }
void taosCloseTcpServerConnection(void *chandle) { static void taosAcceptTcpConnection(void *arg) {
int connFd = -1;
struct sockaddr_in caddr;
int sockFd;
int threadId = 0;
SThreadObj *pThreadObj;
SServerObj *pServerObj;
pServerObj = (SServerObj *)arg;
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) return;
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) {
socklen_t addrlen = sizeof(caddr);
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen);
if (connFd < 0) {
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
continue;
}
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
taosKeepTcpAlive(connFd);
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId;
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = caddr.sin_port;
tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
} else {
close(connFd);
tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
}
// pick up next thread for next connection
threadId++;
threadId = threadId % pServerObj->numOfThreads;
}
}
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
SThreadObj *pThreadObj;
pthread_attr_t thattr;
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
memset(pThreadObj, 0, sizeof(SThreadObj));
strcpy(pThreadObj->label, label);
strcpy(pThreadObj->ipstr, ip);
pThreadObj->shandle = shandle;
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label);
return NULL;
}
pThreadObj->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) {
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
return pThreadObj;
}
void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return;
while (pThreadObj->pHead) {
taosFreeFdObj(pThreadObj->pHead);
pThreadObj->pHead = pThreadObj->pHead->next;
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
tTrace (":%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
SThreadObj * pThreadObj = shandle;
struct in_addr destIp;
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr);
if (fd <= 0) return NULL;
inet_aton(ip, &destIp);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) {
pFdObj->thandle = thandle;
pFdObj->port = port;
pFdObj->ip = destIp.s_addr;
tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
} else {
close(fd);
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
}
return pFdObj;
}
void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
taosCleanUpFdObj(pFdObj); taosFreeFdObj(pFdObj);
} }
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (chandle == NULL) return -1; if (chandle == NULL) return -1;
...@@ -188,8 +322,6 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void ...@@ -188,8 +322,6 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void
return (int)send(pFdObj->fd, data, (size_t)len, 0); return (int)send(pFdObj->fd, data, (size_t)len, 0);
} }
#define maxEvents 10
static void taosReportBrokenLink(SFdObj *pFdObj) { static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
...@@ -209,26 +341,26 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -209,26 +341,26 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
} }
} }
static void taosProcessTcpData(void *param) { #define maxEvents 10
SThreadObj * pThreadObj;
int i, fdNum; static void *taosProcessTcpData(void *param) {
SFdObj * pFdObj; SThreadObj *pThreadObj = param;
SFdObj *pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
pThreadObj = (SThreadObj *)param;
SRpcHead rpcHead; SRpcHead rpcHead;
while (1) { while (1) {
pthread_mutex_lock(&pThreadObj->threadMutex); pthread_mutex_lock(&pThreadObj->mutex);
if (pThreadObj->numOfFds < 1) { if (pThreadObj->numOfFds < 1) {
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->threadMutex); pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
} }
pthread_mutex_unlock(&pThreadObj->threadMutex); pthread_mutex_unlock(&pThreadObj->mutex);
fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
if (fdNum < 0) continue; if (fdNum < 0) continue;
for (i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
pFdObj = events[i].data.ptr; pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
...@@ -270,7 +402,7 @@ static void taosProcessTcpData(void *param) { ...@@ -270,7 +402,7 @@ static void taosProcessTcpData(void *param) {
continue; continue;
} }
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen); // tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead)); memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg; recvInfo.msg = msg;
...@@ -283,91 +415,43 @@ static void taosProcessTcpData(void *param) { ...@@ -283,91 +415,43 @@ static void taosProcessTcpData(void *param) {
recvInfo.connType = RPC_CONN_TCP; recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
} }
return NULL;
} }
static void taosAcceptTcpConnection(void *arg) { static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
int connFd = -1;
struct sockaddr_in clientAddr;
int sockFd;
int threadId = 0;
SThreadObj * pThreadObj;
SServerObj * pServerObj;
SFdObj * pFdObj;
struct epoll_event event; struct epoll_event event;
pServerObj = (SServerObj *)arg; SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
if (pFdObj == NULL) return NULL;
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pFdObj->fd = fd;
pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj;
if (sockFd < 0) { event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); event.data.ptr = pFdObj;
return; if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
} else { tfree(pFdObj);
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); return NULL;
} }
while (1) { // notify the data process, add into the FdObj list
socklen_t addrlen = sizeof(clientAddr); pthread_mutex_lock(&(pThreadObj->mutex));
connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
if (connFd < 0) { pThreadObj->pHead = pFdObj;
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); pThreadObj->numOfFds++;
continue; pthread_cond_signal(&pThreadObj->fdReady);
} pthread_mutex_unlock(&(pThreadObj->mutex));
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
htons(clientAddr.sin_port));
taosKeepTcpAlive(connFd);
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId;
pFdObj = (SFdObj *)malloc(sizeof(SFdObj));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label);
close(connFd);
continue;
}
memset(pFdObj, 0, sizeof(SFdObj));
pFdObj->fd = connFd;
strcpy(pFdObj->ipstr, inet_ntoa(clientAddr.sin_addr));
pFdObj->ip = clientAddr.sin_addr.s_addr;
pFdObj->port = htons(clientAddr.sin_port);
pFdObj->pThreadObj = pThreadObj;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno));
tfree(pFdObj);
close(connFd);
continue;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
// pick up next thread for next connection return pFdObj;
threadId++;
threadId = threadId % pServerObj->numOfThreads;
}
} }
static void taosCleanUpFdObj(SFdObj *pFdObj) { static void taosFreeFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
...@@ -378,7 +462,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { ...@@ -378,7 +462,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
close(pFdObj->fd); close(pFdObj->fd);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
pthread_mutex_lock(&pThreadObj->threadMutex); pthread_mutex_lock(&pThreadObj->mutex);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
...@@ -398,7 +482,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { ...@@ -398,7 +482,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
(pFdObj->next)->prev = pFdObj->prev; (pFdObj->next)->prev = pFdObj->prev;
} }
pthread_mutex_unlock(&pThreadObj->threadMutex); pthread_mutex_unlock(&pThreadObj->mutex);
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
...@@ -406,116 +490,4 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { ...@@ -406,116 +490,4 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
#if 0
static void taosAcceptUDConnection(void *arg) {
int connFd = -1;
int sockFd;
int threadId = 0;
SThreadObj * pThreadObj;
SServerObj * pServerObj;
SFdObj * pFdObj;
struct epoll_event event;
pServerObj = (SServerObj *)arg;
sockFd = taosOpenUDServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) {
tError("%s failed to open UD socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
return;
} else {
tTrace("%s UD server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
}
while (1) {
connFd = accept(sockFd, NULL, NULL);
if (connFd < 0) {
tError("%s UD accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno));
continue;
}
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId;
pFdObj = (SFdObj *)malloc(sizeof(SFdObj));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label);
close(connFd);
continue;
}
memset(pFdObj, 0, sizeof(SFdObj));
pFdObj->fd = connFd;
pFdObj->pThreadObj = pThreadObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
tError("%s failed to add UD FD for epoll, error:%s", pServerObj->label, strerror(errno));
tfree(pFdObj);
close(connFd);
continue;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
pThreadObj->numOfFds);
// pick up next thread for next connection
threadId++;
threadId = threadId % pServerObj->numOfThreads;
}
}
#endif
#if 0
void taosListTcpConnection(void *handle, char *buffer) {
SServerObj *pServerObj;
SThreadObj *pThreadObj;
SFdObj * pFdObj;
int i, numOfFds, numOfConns;
char * msg;
pServerObj = (SServerObj *)handle;
buffer[0] = 0;
msg = buffer;
numOfConns = 0;
pThreadObj = pServerObj->pThreadObj;
for (i = 0; i < pServerObj->numOfThreads; ++i) {
numOfFds = 0;
sprintf(msg, "TCP:%s Thread:%d number of connections:%d\n", pServerObj->label, pThreadObj->threadId,
pThreadObj->numOfFds);
msg = msg + strlen(msg);
pFdObj = pThreadObj->pHead;
while (pFdObj) {
sprintf(msg, " ip:%s port:%hu\n", pFdObj->ipstr, pFdObj->port);
msg = msg + strlen(msg);
numOfFds++;
numOfConns++;
pFdObj = pFdObj->next;
}
if (numOfFds != pThreadObj->numOfFds)
tError("TCP:%s thread:%d BIG error, numOfFds:%d actual numOfFds:%d", pServerObj->label, pThreadObj->threadId,
pThreadObj->numOfFds, numOfFds);
pThreadObj++;
}
sprintf(msg, "TCP:%s total connections:%d\n", pServerObj->label, numOfConns);
return;
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册