未验证 提交 9c0f2cb4 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1253 from taosdata/refact/rpc

Refact/rpc
......@@ -23,18 +23,8 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1
#define TAOS_CONN_TCPS 2
#define TAOS_CONN_TCPC 3
#define TAOS_CONN_HTTPS 4
#define TAOS_CONN_HTTPC 5
#define TAOS_SOCKET_TYPE_NAME_TCP "tcp"
#define TAOS_SOCKET_TYPE_NAME_UDP "udp"
#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS)
#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC)
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize;
......@@ -61,20 +51,20 @@ typedef struct {
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client security only
// the following is for client app ecurity only
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
// call back to process incoming msg
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
// call back to process incoming msg, code shall be ignored by server app
void (*cfp)(char type, void *pCont, int contLen, void *handle, int32_t code);
// call back to process notify the ipSet changes
// call back to process notify the ipSet changes, for client app only
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
// call back to retrieve the client auth info
// call back to retrieve the client auth info, for server app only
int (*afp)(char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit;
......
......@@ -26,7 +26,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus
}
......
......@@ -20,12 +20,29 @@
extern "C" {
#endif
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
typedef struct {
void *msg;
int msgLen;
uint32_t ip;
uint16_t port;
int connType;
void *shandle;
void *thandle;
void *chandle;
} SRecvInfo;
#pragma pack(push, 1)
typedef struct {
char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4
char tcp:2; // tcp flag
char resflag:2; // reserved bits
char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
......@@ -33,12 +50,12 @@ typedef struct {
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN];
char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
int32_t code;
int32_t code; // code in response message
uint8_t content[0]; // message body starts from here
} SRpcHead;
......@@ -54,6 +71,7 @@ typedef struct {
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
......
......@@ -25,7 +25,7 @@ extern "C" {
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle);
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus
}
......
......@@ -22,10 +22,9 @@ extern "C" {
#include "taosdef.h"
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle);
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosFreeMsgHdr(void *hdr);
......
......@@ -45,16 +45,143 @@ typedef struct _tcp_client {
int numOfFds;
char label[12];
char ipstr[20];
void * shandle; // handle passed by upper layer during server initialization
void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle);
// char buffer[128000];
void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pRecv);
} STcpClient;
#define maxTcpEvents 100
static void taosCleanUpTcpFdObj(STcpFd *pFdObj);
static void *taosReadTcpData(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient *pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp;
}
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
STcpClient *pTcp;
SRecvInfo recvInfo;
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
......@@ -75,8 +202,6 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
// remove from the FdObject list
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
......@@ -89,40 +214,28 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pthread_mutex_unlock(&pTcp->mutex);
// notify the upper layer to clean the associated context
if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL);
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo);
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd));
tfree(pFdObj);
}
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
static void *taosReadTcpData(void *param) {
STcpClient * pTcp = (STcpClient *)param;
STcpClient *pTcp = (STcpClient *)param;
int i, fdNum;
STcpFd * pFdObj;
STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo;
while (1) {
pthread_mutex_lock(&pTcp->mutex);
......@@ -186,8 +299,16 @@ static void *taosReadTcpData(void *param) {
continue;
}
pFdObj->thandle =
(*(pTcp->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port, pTcp->shandle, pFdObj->thandle, pFdObj);
recvInfo.msg = buffer;
recvInfo.msgLen = dataLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pTcp->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj);
}
......@@ -196,122 +317,4 @@ static void *taosReadTcpData(void *param) {
return NULL;
}
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient * pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
/*
if ( (strcmp(ip, "127.0.0.1") == 0 ) || (strcmp(ip, "localhost") == 0 ) ) {
fd = taosOpenUDClientSocket(ip, port);
} else {
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
}
*/
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
此差异已折叠。
......@@ -46,10 +46,8 @@ typedef struct _thread_obj {
int numOfFds;
int threadId;
char label[12];
// char buffer[128000]; // buffer to receive data
void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle);
void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket);
} SThreadObj;
typedef struct {
......@@ -62,59 +60,81 @@ typedef struct {
pthread_t thread;
} SServerObj;
static void taosCleanUpFdObj(SFdObj *pFdObj) {
SThreadObj *pThreadObj;
static void taosCleanUpFdObj(SFdObj *pFdObj);
static void taosProcessTcpData(void *param);
static void taosAcceptTcpConnection(void *arg);
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
int i;
SServerObj *pServerObj;
pthread_attr_t thattr;
SThreadObj *pThreadObj;
pThreadObj = pFdObj->pThreadObj;
if (pThreadObj == NULL) {
tError("FdObj double clean up!!!");
return;
pServerObj = (SServerObj *)malloc(sizeof(SServerObj));
strcpy(pServerObj->ip, ip);
pServerObj->port = port;
strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
return NULL;
}
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
pThreadObj = pServerObj->pThreadObj;
for (i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp;
strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle;
pthread_mutex_lock(&pThreadObj->threadMutex);
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
return NULL;
}
pThreadObj->numOfFds--;
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
if (pThreadObj->numOfFds < 0)
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId);
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
// remove from the FdObject list
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
return NULL;
}
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
pThreadObj->pHead = pFdObj->next;
pThreadObj->threadId = i;
pThreadObj++;
}
if (pFdObj->next) {
(pFdObj->next)->prev = pFdObj->prev;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
return NULL;
}
pthread_mutex_unlock(&pThreadObj->threadMutex);
// notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
tfree(pFdObj);
}
void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (pFdObj == NULL) return;
/*
if ( pthread_create(&(pServerObj->thread), &thattr,
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
return NULL;
}
*/
pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
taosCleanUpFdObj(pFdObj);
return (void *)pServerObj;
}
void taosCleanUpTcpServer(void *handle) {
......@@ -148,6 +168,22 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj);
}
void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (pFdObj == NULL) return;
taosCleanUpFdObj(pFdObj);
}
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
#define maxEvents 10
static void taosProcessTcpData(void *param) {
......@@ -155,7 +191,7 @@ static void taosProcessTcpData(void *param) {
int i, fdNum;
SFdObj * pFdObj;
struct epoll_event events[maxEvents];
SRecvInfo recvInfo;
pThreadObj = (SThreadObj *)param;
while (1) {
......@@ -209,15 +245,22 @@ static void taosProcessTcpData(void *param) {
continue;
}
pFdObj->thandle = (*(pThreadObj->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port,
pThreadObj->shandle, pFdObj->thandle, pFdObj);
recvInfo.msg = buffer;
recvInfo.msgLen = dataLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj);
}
}
}
void taosAcceptTcpConnection(void *arg) {
static void taosAcceptTcpConnection(void *arg) {
int connFd = -1;
struct sockaddr_in clientAddr;
int sockFd;
......@@ -280,16 +323,11 @@ void taosAcceptTcpConnection(void *arg) {
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
......@@ -301,7 +339,65 @@ void taosAcceptTcpConnection(void *arg) {
}
}
void taosAcceptUDConnection(void *arg) {
static void taosCleanUpFdObj(SFdObj *pFdObj) {
SThreadObj *pThreadObj;
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pThreadObj = pFdObj->pThreadObj;
if (pThreadObj == NULL) {
tError("FdObj double clean up!!!");
return;
}
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
pthread_mutex_lock(&pThreadObj->threadMutex);
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId);
// remove from the FdObject list
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
pThreadObj->pHead = pFdObj->next;
}
if (pFdObj->next) {
(pFdObj->next)->prev = pFdObj->prev;
}
pthread_mutex_unlock(&pThreadObj->threadMutex);
// notify the upper layer, so it will clean the associated context
SRecvInfo recvInfo;
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
tfree(pFdObj);
}
#if 0
static void taosAcceptUDConnection(void *arg) {
int connFd = -1;
int sockFd;
int threadId = 0;
......@@ -353,16 +449,11 @@ void taosAcceptUDConnection(void *arg) {
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
......@@ -373,79 +464,7 @@ void taosAcceptUDConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads;
}
}
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
int i;
SServerObj * pServerObj;
pthread_attr_t thattr;
SThreadObj * pThreadObj;
pServerObj = (SServerObj *)malloc(sizeof(SServerObj));
strcpy(pServerObj->ip, ip);
pServerObj->port = port;
strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
return NULL;
}
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
pThreadObj = pServerObj->pThreadObj;
for (i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp;
strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle;
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
return NULL;
}
pThreadObj->threadId = i;
pThreadObj++;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
return NULL;
}
/*
if ( pthread_create(&(pServerObj->thread), &thattr,
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
return NULL;
}
*/
pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
return (void *)pServerObj;
}
#endif
#if 0
void taosListTcpConnection(void *handle, char *buffer) {
......@@ -489,10 +508,4 @@ void taosListTcpConnection(void *handle, char *buffer) {
}
#endif
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
此差异已折叠。
......@@ -85,7 +85,6 @@ int main(int argc, char *argv[]) {
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char socketType[20] = "udp";
char serverIp[40] = "127.0.0.1";
struct timeval systemTime;
int64_t startTime, endTime;
......@@ -113,9 +112,7 @@ int main(int argc, char *argv[]) {
rpcInit.ckey = "key";
for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]);
......@@ -138,7 +135,6 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tpc, default is:%s\n", socketType);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
......@@ -154,7 +150,7 @@ int main(int argc, char *argv[]) {
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPC : TAOS_CONN_TCPC;
rpcInit.connType = TAOS_CONN_CLIENT;
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
......
......@@ -60,7 +60,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
char socketType[20] = "udp";
char dataName[20] = "server.data";
memset(&rpcInit, 0, sizeof(rpcInit));
......@@ -73,9 +72,7 @@ int main(int argc, char *argv[]) {
rpcInit.idleTime = 2000;
for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
......@@ -93,7 +90,6 @@ int main(int argc, char *argv[]) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tcp, default is:%s\n", socketType);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
......@@ -107,7 +103,7 @@ int main(int argc, char *argv[]) {
}
}
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPS : TAOS_CONN_TCPS;
rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册