提交 25a4625e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

support both TCP and UDP simultaneously for both server and client

上级 db1e1e54
...@@ -23,18 +23,8 @@ extern "C" { ...@@ -23,18 +23,8 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#define TAOS_CONN_UDPS 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_UDPC 1 #define TAOS_CONN_CLIENT 1
#define TAOS_CONN_TCPS 2
#define TAOS_CONN_TCPC 3
#define TAOS_CONN_HTTPS 4
#define TAOS_CONN_HTTPC 5
#define TAOS_SOCKET_TYPE_NAME_TCP "tcp"
#define TAOS_SOCKET_TYPE_NAME_UDP "udp"
#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS)
#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC)
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
......
...@@ -26,7 +26,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, ...@@ -26,7 +26,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosCloseTcpClientConnection(void *chandle); void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,12 +20,29 @@ ...@@ -20,12 +20,29 @@
extern "C" { extern "C" {
#endif #endif
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
typedef struct {
void *msg;
int msgLen;
uint32_t ip;
uint16_t port;
int connType;
void *shandle;
void *thandle;
void *chandle;
} SRecvInfo;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
char version:4; // RPC version char version:4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4 char comp:4; // compression algorithm, 0:no compression 1:lz4
char tcp:2; // tcp flag char resflag:2; // reserved bits
char spi:3; // security parameter index char spi:3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
...@@ -33,12 +50,12 @@ typedef struct { ...@@ -33,12 +50,12 @@ typedef struct {
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved char empty[1]; // reserved
uint8_t msgType; // message type uint8_t msgType; // message type
int32_t msgLen; // message length including the header iteslf int32_t msgLen; // message length including the header iteslf
int32_t code; int32_t code; // code in response message
uint8_t content[0]; // message body starts from here uint8_t content[0]; // message body starts from here
} SRpcHead; } SRpcHead;
...@@ -54,6 +71,7 @@ typedef struct { ...@@ -54,6 +71,7 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param); void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle); int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,10 +22,9 @@ extern "C" { ...@@ -22,10 +22,9 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void *taosInitUdpClient(char *ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosCleanUpUdpConnection(void *handle); void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle); int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosFreeMsgHdr(void *hdr); void taosFreeMsgHdr(void *hdr);
......
...@@ -45,16 +45,143 @@ typedef struct _tcp_client { ...@@ -45,16 +45,143 @@ typedef struct _tcp_client {
int numOfFds; int numOfFds;
char label[12]; char label[12];
char ipstr[20]; char ipstr[20];
void * shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle, void *(*processData)(SRecvInfo *pRecv);
void *chandle);
// char buffer[128000];
} STcpClient; } STcpClient;
#define maxTcpEvents 100 #define maxTcpEvents 100
static void taosCleanUpTcpFdObj(STcpFd *pFdObj);
static void *taosReadTcpData(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient *pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp;
}
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
STcpClient *pTcp; STcpClient *pTcp;
SRecvInfo recvInfo;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
...@@ -75,8 +202,6 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { ...@@ -75,8 +202,6 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
if (pTcp->numOfFds < 0) if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj); tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
// remove from the FdObject list
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
} else { } else {
...@@ -89,40 +214,28 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { ...@@ -89,40 +214,28 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pthread_mutex_unlock(&pTcp->mutex); pthread_mutex_unlock(&pTcp->mutex);
// notify the upper layer to clean the associated context recvInfo.msg = NULL;
if (pFdObj->thandle) (*(pTcp->processData))(NULL, 0, 0, 0, pTcp->shandle, pFdObj->thandle, NULL); recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo);
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds); tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd)); memset(pFdObj, 0, sizeof(STcpFd));
tfree(pFdObj); tfree(pFdObj);
} }
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
static void *taosReadTcpData(void *param) { static void *taosReadTcpData(void *param) {
STcpClient * pTcp = (STcpClient *)param; STcpClient *pTcp = (STcpClient *)param;
int i, fdNum; int i, fdNum;
STcpFd * pFdObj; STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents]; struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo;
while (1) { while (1) {
pthread_mutex_lock(&pTcp->mutex); pthread_mutex_lock(&pTcp->mutex);
...@@ -186,8 +299,16 @@ static void *taosReadTcpData(void *param) { ...@@ -186,8 +299,16 @@ static void *taosReadTcpData(void *param) {
continue; continue;
} }
pFdObj->thandle = recvInfo.msg = buffer;
(*(pTcp->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port, pTcp->shandle, pFdObj->thandle, pFdObj); recvInfo.msgLen = dataLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pTcp->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj); if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj);
} }
...@@ -196,122 +317,4 @@ static void *taosReadTcpData(void *param) { ...@@ -196,122 +317,4 @@ static void *taosReadTcpData(void *param) {
return NULL; return NULL;
} }
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient * pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
/*
if ( (strcmp(ip, "127.0.0.1") == 0 ) || (strcmp(ip, "localhost") == 0 ) ) {
fd = taosOpenUDClientSocket(ip, port);
} else {
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
}
*/
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
...@@ -62,7 +62,8 @@ typedef struct { ...@@ -62,7 +62,8 @@ typedef struct {
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void *tmrCtrl; // handle to timer
void *hash; // handle returned by hash utility void *hash; // handle returned by hash utility
void *shandle; // returned handle from lower layer during initialization void *tcphandle;// returned handle from TCP initialization
void *udphandle;// returned handle from UDP initialization
void *pCache; // connection cache void *pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct _RpcConn *connList; // connection list struct _RpcConn *connList; // connection list
...@@ -79,6 +80,7 @@ typedef struct { ...@@ -79,6 +80,7 @@ typedef struct {
int16_t numOfTry; // number of try for different servers int16_t numOfTry; // number of try for different servers
int8_t oldIndex; // server IP index passed by app int8_t oldIndex; // server IP index passed by app
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
...@@ -113,6 +115,7 @@ typedef struct _RpcConn { ...@@ -113,6 +115,7 @@ typedef struct _RpcConn {
char *pReqMsg; // request message including header char *pReqMsg; // request message including header
int reqMsgLen; // request message length int reqMsgLen; // request message length
SRpcInfo *pRpc; // the associated SRpcInfo SRpcInfo *pRpc; // the associated SRpcInfo
int connType; // connection type
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
} SRpcConn; } SRpcConn;
...@@ -122,9 +125,16 @@ int tsRpcProgressTime = 10; // milliseocnds ...@@ -122,9 +125,16 @@ int tsRpcProgressTime = 10; // milliseocnds
int tsRpcMaxRetry; int tsRpcMaxRetry;
int tsRpcHeadSize; int tsRpcHeadSize;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpServer, taosInitUdpConnection,
taosInitUdpClient, taosInitUdpConnection,
taosInitTcpServer, taosInitTcpServer,
taosInitTcpClient taosInitTcpClient
}; };
...@@ -136,7 +146,7 @@ void (*taosCleanUpConn[])(void *thandle) = { ...@@ -136,7 +146,7 @@ void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpTcpClient taosCleanUpTcpClient
}; };
int (*taosSendData[])(uint32_t ip, uint16_t port, char *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData,
taosSendUdpData, taosSendUdpData,
taosSendTcpServerData, taosSendTcpServerData,
...@@ -157,19 +167,19 @@ void (*taosCloseConn[])(void *chandle) = { ...@@ -157,19 +167,19 @@ void (*taosCloseConn[])(void *chandle) = {
taosCloseTcpClientConnection taosCloseTcpClientConnection
}; };
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType);
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet); static SRpcConn *rpcSetConnToServer(SRpcReqContext *pContext);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr); static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, char *user, char *hashstr);
static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr); static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashstr);
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
static void rpcProcessConnError(void *param, void *id); static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *); static void rpcProcessRetryTimer(void *, void *);
...@@ -194,7 +204,8 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -194,7 +204,8 @@ void *rpcOpen(SRpcInit *pInit) {
if(pInit->label) strcpy(pRpc->label, pInit->label); if(pInit->label) strcpy(pRpc->label, pInit->label);
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; // pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
pRpc->numOfThreads = 1;
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
...@@ -207,9 +218,12 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -207,9 +218,12 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->shandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(pRpc->localIp, pRpc->localPort, pRpc->label,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
pRpc->udphandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
if (pRpc->shandle == NULL) {
if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
...@@ -237,18 +251,20 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -237,18 +251,20 @@ void *rpcOpen(SRpcInit *pInit) {
return NULL; return NULL;
} }
pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); if (pRpc->connType == TAOS_CONN_SERVER) {
if (pRpc->hash == NULL) { pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
tError("%s failed to init string hash", pRpc->label); if (pRpc->hash == NULL) {
rpcClose(pRpc); tError("%s failed to init string hash", pRpc->label);
return NULL; rpcClose(pRpc);
} return NULL;
}
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000); } else {
if ( pRpc->pCache == NULL ) { pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, tsShellActivityTimer*1000);
tError("%s failed to init connection cache", pRpc->label); if ( pRpc->pCache == NULL ) {
rpcClose(pRpc); tError("%s failed to init connection cache", pRpc->label);
return NULL; rpcClose(pRpc);
return NULL;
}
} }
pthread_mutex_init(&pRpc->mutex, NULL); pthread_mutex_init(&pRpc->mutex, NULL);
...@@ -261,7 +277,8 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -261,7 +277,8 @@ void *rpcOpen(SRpcInit *pInit) {
void rpcClose(void *param) { void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param; SRpcInfo *pRpc = (SRpcInfo *)param;
(*taosCleanUpConn[pRpc->connType])(pRpc->shandle); (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
for (int i = 0; i < pRpc->sessions; ++i) { for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].user[0]) { if (pRpc->connList[i].user[0]) {
...@@ -313,6 +330,16 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in ...@@ -313,6 +330,16 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
pContext->msgType = type; pContext->msgType = type;
pContext->oldIndex = pIpSet->index; pContext->oldIndex = pIpSet->index;
pContext->connType = RPC_CONN_UDPC;
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW )
pContext->connType = RPC_CONN_TCPC;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
return; return;
...@@ -346,7 +373,6 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -346,7 +373,6 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType+1;
pHead->spi = 0; pHead->spi = 0;
pHead->tcp = 0;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
...@@ -388,7 +414,6 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { ...@@ -388,7 +414,6 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc;
pInfo->clientIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort; pInfo->clientPort = pConn->peerPort;
...@@ -396,7 +421,7 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -396,7 +421,7 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
strcpy(pInfo->user, pConn->user); strcpy(pInfo->user, pConn->user);
} }
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn; SRpcConn *pConn;
pConn = rpcAllocateClientConn(pRpc); pConn = rpcAllocateClientConn(pRpc);
...@@ -406,12 +431,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort) ...@@ -406,12 +431,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
pConn->peerIp = inet_addr(peerIpStr); pConn->peerIp = inet_addr(peerIpStr);
pConn->peerPort = peerPort; pConn->peerPort = peerPort;
strcpy(pConn->user, pRpc->user); strcpy(pConn->user, pRpc->user);
pConn->connType = connType;
if (taosOpenConn[pRpc->connType]) { if (taosOpenConn[connType]) {
pConn->chandle = (*taosOpenConn[pRpc->connType])(pRpc->shandle, pConn, pConn->peerIpstr, pConn->peerPort); void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIpstr, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu localPort:%d", pRpc->label, tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu connType:%d", pRpc->label,
pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->localPort); pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->connType);
} else { } else {
tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn, tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn,
pConn->peerIpstr, pConn->peerPort); pConn->peerIpstr, pConn->peerPort);
...@@ -433,14 +460,14 @@ static void rpcCloseConn(void *thandle) { ...@@ -433,14 +460,14 @@ static void rpcCloseConn(void *thandle) {
if (pConn->user[0]) { if (pConn->user[0]) {
pConn->user[0] = 0; pConn->user[0] = 0;
if (taosCloseConn[pRpc->connType]) (*taosCloseConn[pRpc->connType])(pConn->chandle); if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_UDPS || pRpc->connType == TAOS_CONN_TCPS) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x", pConn->peerIp, pConn->peerUid, pConn->peerId); sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
...@@ -540,15 +567,17 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashst ...@@ -540,15 +567,17 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashst
return pConn; return pConn;
} }
SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) { SRpcConn *rpcSetConnToServer(SRpcReqContext *pContext) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = pContext->pRpc;
SRpcIpSet *pIpSet = &pContext->ipSet;
pConn = rpcGetConnFromCache(pRpc->pCache, ipSet.ip[ipSet.index], ipSet.port, pRpc->user); pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->index], pIpSet->port, pRpc->user);
if ( pConn == NULL ) { if ( pConn == NULL ) {
char ipstr[20] = {0}; char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); tinet_ntoa(ipstr, pIpSet->ip[pIpSet->index]);
pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType);
pConn->destIp = ipSet.ip[ipSet.index]; if (pConn) pConn->destIp = pIpSet->ip[pIpSet->index];
} }
return pConn; return pConn;
...@@ -618,7 +647,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -618,7 +647,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
pConn->retry = 0; pConn->retry = 0;
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS || pHead->tcp) { if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) {
if (pConn->tretry <= tsRpcMaxRetry) { if (pConn->tretry <= tsRpcMaxRetry) {
pConn->tretry++; pConn->tretry++;
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
...@@ -638,13 +667,12 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -638,13 +667,12 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int dataLen, uint32_t ip) { static SRpcConn *rpcProcessHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
int32_t sid, code = 0; int32_t sid;
SRpcConn * pConn = NULL; SRpcConn *pConn = NULL;
char hashstr[40] = {0}; char hashstr[40] = {0};
*ppConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcHead *pHead = (SRpcHead *)data;
sid = htonl(pHead->destId); sid = htonl(pHead->destId);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -652,50 +680,54 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int ...@@ -652,50 +680,54 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tTrace("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
return TSDB_CODE_INVALID_MSG_TYPE; terrno = TSDB_CODE_INVALID_MSG_TYPE; return NULL;
} }
if (dataLen != pHead->msgLen) { if (pRecv->msgLen != pHead->msgLen) {
tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid, tTrace("%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d", pRpc->label, sid,
taosMsg[pHead->msgType], dataLen, pHead->msgLen); taosMsg[pHead->msgType], pRecv->msgLen, pHead->msgLen);
return TSDB_CODE_INVALID_MSG_LEN; terrno = TSDB_CODE_INVALID_MSG_LEN; return NULL;
} }
if (sid < 0 || sid >= pRpc->sessions) { if (sid < 0 || sid >= pRpc->sessions) {
tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, tTrace("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid,
pRpc->sessions, taosMsg[pHead->msgType]); pRpc->sessions, taosMsg[pHead->msgType]);
return TSDB_CODE_INVALID_SESSION_ID; terrno = TSDB_CODE_INVALID_SESSION_ID; return NULL;
} }
if (sid == 0) sprintf(hashstr, "%x:%x:%x", ip, pHead->uid, pHead->sourceId); if (sid == 0) sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->uid, pHead->sourceId, pRecv->connType);
pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr); pConn = rpcGetConnObj(pRpc, sid, pHead->user, hashstr);
if (pConn == NULL ) return terrno; if (pConn == NULL) return NULL;
*ppConn = pConn;
sid = pConn->sid; sid = pConn->sid;
if (pHead->uid) pConn->peerUid = pHead->uid; pConn->chandle = pRecv->chandle;
if (pConn->peerIp != pRecv->ip) {
if (pHead->tcp) { pConn->peerIp = pRecv->ip;
tTrace("%s %p, content will be transfered via TCP", pRpc->label, pConn); char ipstr[20] = {0};
if (pConn->outType) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); tinet_ntoa(ipstr, pRecv->ip);
return TSDB_CODE_ALREADY_PROCESSED; strcpy(pConn->peerIpstr, ipstr);
} }
if (pRecv->port) pConn->peerPort = pRecv->port;
if (pHead->port) pConn->peerPort = pHead->port;
if (pHead->uid) pConn->peerUid = pHead->uid;
code = rpcCheckAuthentication(pConn, (char *)pHead, dataLen); terrno = rpcCheckAuthentication(pConn, (char *)pHead, pRecv->msgLen);
if ( code != 0 ) return code; if (terrno != 0) return pConn;
if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) { if (pHead->msgType != TSDB_MSG_TYPE_REG && pHead->encrypt) {
// decrypt here // decrypt here
} }
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
code = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
pConn->connType = pRecv->connType;
} else { } else {
code = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
} }
return code; return pConn;
} }
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
...@@ -713,45 +745,31 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -713,45 +745,31 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
int32_t code = 0; int32_t code = 0;
tDump(msg, msgLen); tDump(pRecv->msg, pRecv->msgLen);
// underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType;
if (ip==0 && pConn) { if (pRecv->ip==0 && pConn) {
rpcProcessBrokenLink(pConn); rpcProcessBrokenLink(pConn);
tfree(msg); tfree(pRecv->msg);
return NULL; return NULL;
} }
pthread_mutex_lock(&pRpc->mutex); pthread_mutex_lock(&pRpc->mutex);
pConn = rpcProcessHead(pRpc, pRecv);
code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip);
if (pConn) {
// update connection info
pConn->chandle = chandle;
if (pConn->peerIp != ip) {
pConn->peerIp = ip;
char ipstr[20] = {0};
tinet_ntoa(ipstr, ip);
strcpy(pConn->peerIpstr, ipstr);
}
if (port) pConn->peerPort = port;
if (pHead->port) // port maybe changed by the peer
pConn->peerPort = pHead->port;
}
pthread_mutex_unlock(&pRpc->mutex); pthread_mutex_unlock(&pRpc->mutex);
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, code,
msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
if (pConn && pRpc->idleTime) { if (pConn && pRpc->idleTime) {
...@@ -761,7 +779,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t ...@@ -761,7 +779,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle); rpcSendErrorMsgToPeer(pRecv, code);
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // parsing OK
...@@ -769,7 +787,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t ...@@ -769,7 +787,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
} }
} }
if ( code != 0 ) free (msg); if ( code != 0 ) free (pRecv->msg);
return pConn; return pConn;
} }
...@@ -816,7 +834,6 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -816,7 +834,6 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType+1;
pHead->spi = 0; pHead->spi = 0;
pHead->tcp = 0;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
...@@ -828,19 +845,18 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { ...@@ -828,19 +845,18 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
rpcSendMsgToPeer(pConn, msg, 0); rpcSendMsgToPeer(pConn, msg, 0);
} }
static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
SRpcHead *pRecvHead, *pReplyHead; SRpcHead *pRecvHead, *pReplyHead;
char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ];
uint32_t timeStamp; uint32_t timeStamp;
int msgLen; int msgLen;
pRecvHead = (SRpcHead *)pMsg; pRecvHead = (SRpcHead *)pRecv->msg;
pReplyHead = (SRpcHead *)msg; pReplyHead = (SRpcHead *)msg;
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
pReplyHead->version = pRecvHead->version; pReplyHead->version = pRecvHead->version;
pReplyHead->msgType = (char)(pRecvHead->msgType + 1); pReplyHead->msgType = (char)(pRecvHead->msgType + 1);
pReplyHead->tcp = 0;
pReplyHead->spi = 0; pReplyHead->spi = 0;
pReplyHead->encrypt = 0; pReplyHead->encrypt = 0;
pReplyHead->tranId = pRecvHead->tranId; pReplyHead->tranId = pRecvHead->tranId;
...@@ -860,7 +876,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint ...@@ -860,7 +876,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
} }
pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRpc->connType])(ip, port, msg, msgLen, chandle); (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
return; return;
} }
...@@ -872,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -872,7 +888,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
char msgType = pContext->msgType; char msgType = pContext->msgType;
pContext->numOfTry++; pContext->numOfTry++;
SRpcConn *pConn = rpcSetConnToServer(pRpc, pContext->ipSet); SRpcConn *pConn = rpcSetConnToServer(pContext);
if (pConn == NULL) { if (pConn == NULL) {
pContext->code = terrno; pContext->code = terrno;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
...@@ -884,7 +900,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -884,7 +900,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
// set the message header // set the message header
pHead->version = 1; pHead->version = 1;
pHead->msgType = msgType; pHead->msgType = msgType;
pHead->tcp = 0;
pHead->encrypt = 0; pHead->encrypt = 0;
pConn->tranId++; pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++; if ( pConn->tranId == 0 ) pConn->tranId++;
...@@ -928,7 +943,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -928,7 +943,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
writtenLen = (*taosSendData[pRpc->connType])(pConn->peerIp, pConn->peerPort, (char *)pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
......
...@@ -46,10 +46,8 @@ typedef struct _thread_obj { ...@@ -46,10 +46,8 @@ typedef struct _thread_obj {
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[12]; char label[12];
// char buffer[128000]; // buffer to receive data void *shandle; // handle passed by upper layer during server initialization
void *shandle; // handle passed by upper layer during server initialization void *(*processData)(SRecvInfo *pPacket);
void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle,
void *chandle);
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
...@@ -62,59 +60,81 @@ typedef struct { ...@@ -62,59 +60,81 @@ typedef struct {
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
static void taosCleanUpFdObj(SFdObj *pFdObj) { static void taosCleanUpFdObj(SFdObj *pFdObj);
SThreadObj *pThreadObj; static void taosProcessTcpData(void *param);
static void taosAcceptTcpConnection(void *arg);
if (pFdObj == NULL) return; void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
if (pFdObj->signature != pFdObj) return; int i;
SServerObj *pServerObj;
pthread_attr_t thattr;
SThreadObj *pThreadObj;
pThreadObj = pFdObj->pThreadObj; pServerObj = (SServerObj *)malloc(sizeof(SServerObj));
if (pThreadObj == NULL) { strcpy(pServerObj->ip, ip);
tError("FdObj double clean up!!!"); pServerObj->port = port;
return; strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
return NULL;
} }
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); pThreadObj = pServerObj->pThreadObj;
close(pFdObj->fd); for (i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp;
strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle;
pthread_mutex_lock(&pThreadObj->threadMutex); if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
return NULL;
}
pThreadObj->numOfFds--; if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
if (pThreadObj->numOfFds < 0) pThreadObj->pollFd = epoll_create(10); // size does not matter
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId); if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
// remove from the FdObject list pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
return NULL;
}
if (pFdObj->prev) { pThreadObj->threadId = i;
(pFdObj->prev)->next = pFdObj->next; pThreadObj++;
} else {
pThreadObj->pHead = pFdObj->next;
} }
if (pFdObj->next) { pthread_attr_init(&thattr);
(pFdObj->next)->prev = pFdObj->prev; pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
return NULL;
} }
pthread_mutex_unlock(&pThreadObj->threadMutex); /*
if ( pthread_create(&(pServerObj->thread), &thattr,
// notify the upper layer, so it will clean the associated context (void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
if (pFdObj->thandle) (*(pThreadObj->processData))(NULL, 0, 0, 0, pThreadObj->shandle, pFdObj->thandle, NULL); tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, return NULL;
pFdObj, pThreadObj->numOfFds); }
*/
memset(pFdObj, 0, sizeof(SFdObj)); pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
tfree(pFdObj);
}
void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (pFdObj == NULL) return;
taosCleanUpFdObj(pFdObj); return (void *)pServerObj;
} }
void taosCleanUpTcpServer(void *handle) { void taosCleanUpTcpServer(void *handle) {
...@@ -148,6 +168,22 @@ void taosCleanUpTcpServer(void *handle) { ...@@ -148,6 +168,22 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj); tfree(pServerObj);
} }
void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (pFdObj == NULL) return;
taosCleanUpFdObj(pFdObj);
}
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
#define maxEvents 10 #define maxEvents 10
static void taosProcessTcpData(void *param) { static void taosProcessTcpData(void *param) {
...@@ -155,7 +191,7 @@ static void taosProcessTcpData(void *param) { ...@@ -155,7 +191,7 @@ static void taosProcessTcpData(void *param) {
int i, fdNum; int i, fdNum;
SFdObj * pFdObj; SFdObj * pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo;
pThreadObj = (SThreadObj *)param; pThreadObj = (SThreadObj *)param;
while (1) { while (1) {
...@@ -209,15 +245,22 @@ static void taosProcessTcpData(void *param) { ...@@ -209,15 +245,22 @@ static void taosProcessTcpData(void *param) {
continue; continue;
} }
pFdObj->thandle = (*(pThreadObj->processData))(buffer, dataLen, pFdObj->ip, pFdObj->port, recvInfo.msg = buffer;
pThreadObj->shandle, pFdObj->thandle, pFdObj); recvInfo.msgLen = dataLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj); if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj);
} }
} }
} }
void taosAcceptTcpConnection(void *arg) { static void taosAcceptTcpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int sockFd; int sockFd;
...@@ -280,16 +323,11 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -280,16 +323,11 @@ void taosAcceptTcpConnection(void *arg) {
// notify the data process, add into the FdObj list // notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex)); pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead; pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj; pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++; pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady); pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex)); pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
...@@ -301,7 +339,65 @@ void taosAcceptTcpConnection(void *arg) { ...@@ -301,7 +339,65 @@ void taosAcceptTcpConnection(void *arg) {
} }
} }
void taosAcceptUDConnection(void *arg) { static void taosCleanUpFdObj(SFdObj *pFdObj) {
SThreadObj *pThreadObj;
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pThreadObj = pFdObj->pThreadObj;
if (pThreadObj == NULL) {
tError("FdObj double clean up!!!");
return;
}
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
pthread_mutex_lock(&pThreadObj->threadMutex);
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId);
// remove from the FdObject list
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
pThreadObj->pHead = pFdObj->next;
}
if (pFdObj->next) {
(pFdObj->next)->prev = pFdObj->prev;
}
pthread_mutex_unlock(&pThreadObj->threadMutex);
// notify the upper layer, so it will clean the associated context
SRecvInfo recvInfo;
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
tfree(pFdObj);
}
#if 0
static void taosAcceptUDConnection(void *arg) {
int connFd = -1; int connFd = -1;
int sockFd; int sockFd;
int threadId = 0; int threadId = 0;
...@@ -353,16 +449,11 @@ void taosAcceptUDConnection(void *arg) { ...@@ -353,16 +449,11 @@ void taosAcceptUDConnection(void *arg) {
// notify the data process, add into the FdObj list // notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex)); pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead; pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj; pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++; pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady); pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex)); pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId, tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
...@@ -373,79 +464,7 @@ void taosAcceptUDConnection(void *arg) { ...@@ -373,79 +464,7 @@ void taosAcceptUDConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads; threadId = threadId % pServerObj->numOfThreads;
} }
} }
#endif
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
int i;
SServerObj * pServerObj;
pthread_attr_t thattr;
SThreadObj * pThreadObj;
pServerObj = (SServerObj *)malloc(sizeof(SServerObj));
strcpy(pServerObj->ip, ip);
pServerObj->port = port;
strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
return NULL;
}
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
pThreadObj = pServerObj->pThreadObj;
for (i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp;
strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle;
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
return NULL;
}
pThreadObj->threadId = i;
pThreadObj++;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
return NULL;
}
/*
if ( pthread_create(&(pServerObj->thread), &thattr,
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
return NULL;
}
*/
pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
return (void *)pServerObj;
}
#if 0 #if 0
void taosListTcpConnection(void *handle, char *buffer) { void taosListTcpConnection(void *handle, char *buffer) {
...@@ -489,10 +508,4 @@ void taosListTcpConnection(void *handle, char *buffer) { ...@@ -489,10 +508,4 @@ void taosListTcpConnection(void *handle, char *buffer) {
} }
#endif #endif
int taosSendTcpServerData(uint32_t ip, uint16_t port, char *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
...@@ -44,9 +44,8 @@ typedef struct { ...@@ -44,9 +44,8 @@ typedef struct {
void * hash; void * hash;
void * shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void * pSet; void * pSet;
void *(*processData)(char *data, int dataLen, unsigned int ip, uint16_t port, void *shandle, void *thandle, void *(*processData)(SRecvInfo *pRecv);
void *chandle); char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data
char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data
} SUdpConn; } SUdpConn;
typedef struct { typedef struct {
...@@ -58,10 +57,8 @@ typedef struct { ...@@ -58,10 +57,8 @@ typedef struct {
int threads; int threads;
char label[12]; char label[12];
void * tmrCtrl; void * tmrCtrl;
pthread_t tcpThread; void *(*fp)(SRecvInfo *pPacket);
int tcpFd; SUdpConn udpConn[];
void *(*fp)(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle);
SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
typedef struct { typedef struct {
...@@ -76,420 +73,9 @@ typedef struct { ...@@ -76,420 +73,9 @@ typedef struct {
int emptyNum; int emptyNum;
} SUdpBuf; } SUdpBuf;
typedef struct { static void *taosRecvUdpData(void *param);
uint64_t handle; static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
uint16_t port; static void taosProcessUdpBufTimer(void *param, void *tmrId);
int32_t msgLen;
} SPacketInfo;
typedef struct {
int fd;
uint32_t ip;
uint16_t port;
SUdpConnSet *pSet;
} STransfer;
typedef struct {
void * pTimer;
SUdpConnSet *pSet;
SUdpConn * pConn;
int dataLen;
uint32_t ip;
uint16_t port;
char data[96];
} SMonitor;
typedef struct {
uint64_t handle;
uint64_t hash;
} SHandleViaTcp;
void taosFreeMsgHdr(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
free(msgHdr->msg_iov);
}
int taosMsgHdrSize(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
return (int)msgHdr->msg_iovlen;
}
void taosSendMsgHdr(void *hdr, int fd) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
sendmsg(fd, msgHdr, 0);
msgHdr->msg_iovlen = 0;
}
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) {
struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr));
memset(msgHdr, 0, sizeof(struct msghdr));
*hdr = msgHdr;
struct sockaddr_in *destAdd = (struct sockaddr_in *)dest;
msgHdr->msg_name = destAdd;
msgHdr->msg_namelen = sizeof(struct sockaddr_in);
int size = (int)sizeof(struct iovec) * maxPkts;
msgHdr->msg_iov = (struct iovec *)malloc((size_t)size);
memset(msgHdr->msg_iov, 0, (size_t)size);
}
void taosSetMsgHdrData(void *hdr, char *data, int dataLen) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen;
msgHdr->msg_iovlen++;
}
bool taosCheckHandleViaTcpValid(SHandleViaTcp *handleViaTcp) {
return handleViaTcp->hash == taosHashUInt64(handleViaTcp->handle);
}
void taosInitHandleViaTcp(SHandleViaTcp *handleViaTcp, uint64_t handle) {
handleViaTcp->handle = handle;
handleViaTcp->hash = taosHashUInt64(handleViaTcp->handle);
}
void taosProcessMonitorTimer(void *param, void *tmrId) {
SMonitor *pMonitor = (SMonitor *)param;
if (pMonitor->pTimer != tmrId) return;
SUdpConnSet *pSet = pMonitor->pSet;
pMonitor->pTimer = NULL;
if (pSet) {
char *data = malloc((size_t)pMonitor->dataLen);
memcpy(data, pMonitor->data, (size_t)pMonitor->dataLen);
tTrace("%s monitor timer is expired, update the link status", pSet->label);
(*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL);
taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
} else {
taosTmrStopA(&pMonitor->pTimer);
free(pMonitor);
}
}
void *taosReadTcpData(void *argv) {
SMonitor *pMonitor = (SMonitor *)argv;
SRpcHead *pHead = (SRpcHead *)pMonitor->data;
SPacketInfo *pInfo = (SPacketInfo *)pHead->content;
SUdpConnSet *pSet = pMonitor->pSet;
int retLen, fd;
char ipstr[64];
pInfo->msgLen = (int32_t)htonl((uint32_t)pInfo->msgLen);
tinet_ntoa(ipstr, pMonitor->ip);
tTrace("%s receive packet via TCP:%s:%hu, msgLen:%d, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label,
ipstr, pInfo->port, pInfo->msgLen, pInfo->handle, pHead->sourceId, pHead->destId, pHead->tranId);
fd = taosOpenTcpClientSocket(ipstr, (int16_t)pInfo->port, tsLocalIp);
if (fd < 0) {
tError("%s failed to open TCP client socket ip:%s:%hu", pSet->label, ipstr, pInfo->port);
pMonitor->pSet = NULL;
return NULL;
}
SHandleViaTcp handleViaTcp;
taosInitHandleViaTcp(&handleViaTcp, pInfo->handle);
retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp));
if (retLen != (int)sizeof(SHandleViaTcp)) {
tError("%s failed to send handle:0x%x to server, retLen:%d", pSet->label, pInfo->handle, retLen);
pMonitor->pSet = NULL;
} else {
tTrace("%s handle:0x%x is sent to server", pSet->label, pInfo->handle);
char *buffer = malloc((size_t)pInfo->msgLen);
if (NULL == buffer) {
tError("%s failed to malloc(size:%d) for recv server data", pSet->label, pInfo->msgLen);
retLen = 0;
//taosCloseTcpSocket(fd);
//pMonitor->pSet = NULL;
//return NULL;
} else {
retLen = taosReadMsg(fd, buffer, pInfo->msgLen);
}
pMonitor->pSet = NULL;
if (retLen != pInfo->msgLen) {
tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen);
tfree(buffer);
} else {
(*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, pInfo->port, pSet->shandle, NULL, pMonitor->pConn);
}
}
taosCloseTcpSocket(fd);
return NULL;
}
int taosReceivePacketViaTcp(uint32_t ip, SRpcHead *pHead, SUdpConn *pConn) {
SUdpConnSet * pSet = pConn->pSet;
SPacketInfo * pInfo = (SPacketInfo *)pHead->content;
int code = 0;
pthread_attr_t thattr;
pthread_t thread;
tTrace("%s receive packet via TCP, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d", pSet->label, pInfo->handle,
pHead->sourceId, pHead->destId, pHead->tranId);
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
pMonitor->dataLen = sizeof(SRpcHead) + sizeof(SPacketInfo);
memcpy(pMonitor->data, pHead, (size_t)pMonitor->dataLen);
pMonitor->pSet = pSet;
pMonitor->ip = ip;
pMonitor->port = pInfo->port;
pMonitor->pConn = pConn;
taosTmrReset(taosProcessMonitorTimer, 0, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor);
if (code < 0) {
tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno));
pMonitor->pSet = NULL;
}
pthread_attr_destroy(&thattr);
return code;
}
void *taosRecvUdpData(void *param) {
struct sockaddr_in sourceAdd;
unsigned int addLen, dataLen;
SUdpConn * pConn = (SUdpConn *)param;
uint16_t port;
int minSize = sizeof(SRpcHead);
memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd);
tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index);
while (1) {
dataLen =
(uint32_t)recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port),
dataLen);
if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
continue;
}
port = ntohs(sourceAdd.sin_port);
int processedLen = 0, leftLen = 0;
int msgLen = 0;
int count = 0;
char *msg = pConn->buffer;
while (processedLen < (int)dataLen) {
leftLen = dataLen - processedLen;
SRpcHead *pHead = (SRpcHead *)msg;
msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) {
tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen,
processedLen, count, msgLen);
break;
}
if (pHead->tcp == 1) {
taosReceivePacketViaTcp(sourceAdd.sin_addr.s_addr, (SRpcHead *)msg, pConn);
} else {
char *data = malloc((size_t)msgLen);
memcpy(data, msg, (size_t)msgLen);
(*(pConn->processData))(data, msgLen, sourceAdd.sin_addr.s_addr, port, pConn->shandle, NULL, pConn);
}
processedLen += msgLen;
msg += msgLen;
count++;
}
// tTrace("%s %d UDP packets are received together", pConn->label, count);
}
return NULL;
}
void *taosTransferDataViaTcp(void *argv) {
STransfer * pTransfer = (STransfer *)argv;
int connFd = pTransfer->fd;
int msgLen, retLen, leftLen;
uint64_t handle;
SRpcHead *pHead = NULL, head;
SUdpConnSet *pSet = pTransfer->pSet;
SHandleViaTcp handleViaTcp;
retLen = taosReadMsg(connFd, &handleViaTcp, sizeof(SHandleViaTcp));
if (retLen != sizeof(SHandleViaTcp)) {
tError("%s UDP server failed to read handle, retLen:%d", pSet->label, retLen);
taosCloseSocket(connFd);
free(pTransfer);
return NULL;
}
if (!taosCheckHandleViaTcpValid(&handleViaTcp)) {
tError("%s UDP server read handle via tcp invalid, handle:%" PRIu64 ", hash:%" PRIu64, pSet->label, handleViaTcp.handle,
handleViaTcp.hash);
taosCloseSocket(connFd);
free(pTransfer);
return NULL;
}
handle = handleViaTcp.handle;
if (handle == 0) {
// receive a packet from client
tTrace("%s data will be received via TCP from 0x%x:%hu", pSet->label, pTransfer->ip, pTransfer->port);
retLen = taosReadMsg(connFd, &head, sizeof(SRpcHead));
if (retLen != (int)sizeof(SRpcHead)) {
tError("%s failed to read msg header, retLen:%d", pSet->label, retLen);
} else {
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
if (NULL == pMonitor) {
tError("%s malloc failed by TransferViaTcp from client", pSet->label);
taosCloseSocket(connFd);
free(pTransfer);
return NULL;
}
pMonitor->dataLen = sizeof(SRpcHead);
memcpy(pMonitor->data, &head, (size_t)pMonitor->dataLen);
((SRpcHead *)pMonitor->data)->msgLen = (int32_t)htonl(sizeof(SRpcHead));
((SRpcHead *)pMonitor->data)->tcp = 1;
pMonitor->ip = pTransfer->ip;
pMonitor->port = head.port;
pMonitor->pSet = pSet;
taosTmrReset(taosProcessMonitorTimer, 0, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
msgLen = (int32_t)htonl((uint32_t)head.msgLen);
char *buffer = malloc((size_t)msgLen);
if (NULL == buffer) {
tError("%s malloc failed for msg by TransferViaTcp", pSet->label);
taosCloseSocket(connFd);
free(pTransfer);
return NULL;
}
leftLen = msgLen - (int)sizeof(SRpcHead);
retLen = taosReadMsg(connFd, buffer + sizeof(SRpcHead), leftLen);
pMonitor->pSet = NULL;
if (retLen != leftLen) {
tError("%s failed to read data from client, leftLen:%d retLen:%d, error:%s", pSet->label, leftLen, retLen,
strerror(errno));
} else {
tTrace("%s data is received from client via TCP from 0x%x:%hu, msgLen:%d", pSet->label, pTransfer->ip,
pTransfer->port, msgLen);
pSet->index = (pSet->index + 1) % pSet->threads;
SUdpConn *pConn = pSet->udpConn + pSet->index;
memcpy(buffer, &head, sizeof(SRpcHead));
(*pSet->fp)(buffer, msgLen, pTransfer->ip, head.port, pSet->shandle, NULL, pConn);
}
taosWriteMsg(connFd, &handleViaTcp, sizeof(SHandleViaTcp));
}
} else {
// send a packet to client
tTrace("%s send packet to client via TCP, handle:0x%x", pSet->label, handle);
pHead = (SRpcHead *)handle;
msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (pHead->tcp != 0 || msgLen < 1024) {
tError("%s invalid handle:%p, connection shall be closed", pSet->label, pHead);
} else {
SMonitor *pMonitor = (SMonitor *)calloc(1, sizeof(SMonitor));
if (NULL == pMonitor) {
tError("%s malloc failed by TransferViaTcp to client", pSet->label);
taosCloseSocket(connFd);
free(pTransfer);
return NULL;
}
pMonitor->dataLen = sizeof(SRpcHead);
memcpy(pMonitor->data, (void *)handle, (size_t)pMonitor->dataLen);
SRpcHead *pThead = (SRpcHead *)pMonitor->data;
pThead->tcp = 1;
pThead->msgType = (char)(pHead->msgType - 1);
pThead->msgLen = (int32_t)htonl(sizeof(SRpcHead));
uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id;
pMonitor->ip = pTransfer->ip;
pMonitor->port = pTransfer->port;
pMonitor->pSet = pSet;
taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
retLen = taosWriteMsg(connFd, (void *)handle, msgLen);
pMonitor->pSet = NULL;
if (retLen != msgLen) {
tError("%s failed to send data to client, msgLen:%d retLen:%d", pSet->label, msgLen, retLen);
} else {
tTrace("%s data is sent to client successfully via TCP to 0x%x:%hu, size:%d", pSet->label, pTransfer->ip,
pTransfer->port, msgLen);
}
}
}
// retLen = taosReadMsg(connFd, &handleViaTcp, sizeof(handleViaTcp));
free(pTransfer);
taosCloseSocket(connFd);
return NULL;
}
void *taosUdpTcpConnection(void *argv) {
int connFd = -1;
struct sockaddr_in clientAddr;
pthread_attr_t thattr;
pthread_t thread;
uint32_t sourceIp;
char ipstr[20];
SUdpConnSet *pSet = (SUdpConnSet *)argv;
pSet->tcpFd = taosOpenTcpServerSocket(pSet->ip, pSet->port);
if (pSet->tcpFd < 0) {
tPrint("%s failed to create TCP socket %s:%hu for UDP server, reason:%s", pSet->label, pSet->ip, pSet->port,
strerror(errno));
taosKillSystem();
return NULL;
}
tTrace("%s UDP server is created, ip:%s:%hu", pSet->label, pSet->ip, pSet->port);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
while (1) {
if (pSet->tcpFd < 0) break;
socklen_t addrlen = sizeof(clientAddr);
connFd = accept(pSet->tcpFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 0) {
tError("%s UDP server TCP accept failure, reason:%s", pSet->label, strerror(errno));
continue;
}
sourceIp = clientAddr.sin_addr.s_addr;
tinet_ntoa(ipstr, sourceIp);
tTrace("%s UDP server TCP connection from ip:%s:%u", pSet->label, ipstr, htons(clientAddr.sin_port));
STransfer *pTransfer = malloc(sizeof(STransfer));
pTransfer->fd = connFd;
pTransfer->ip = sourceIp;
pTransfer->port = clientAddr.sin_port;
pTransfer->pSet = pSet;
if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) {
tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno));
free(pTransfer);
taosCloseSocket(connFd);
}
}
pthread_attr_destroy(&thattr);
return NULL;
}
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -508,7 +94,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -508,7 +94,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pSet->port = port; pSet->port = port;
pSet->shandle = shandle; pSet->shandle = shandle;
pSet->fp = fp; pSet->fp = fp;
pSet->tcpFd = -1;
strcpy(pSet->label, label); strcpy(pSet->label, label);
// if ( tsUdpDelay ) { // if ( tsUdpDelay ) {
...@@ -570,39 +155,11 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -570,39 +155,11 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
return pSet; return pSet;
} }
void *taosInitUdpServer(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConnSet *pSet;
pSet = taosInitUdpConnection(ip, port, label, threads, fp, shandle);
if (pSet == NULL) return NULL;
pSet->server = 1;
pSet->fp = fp;
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
// not support by windows
// pthread_t thread;
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet);
pthread_attr_destroy(&thattr);
return pSet;
}
void *taosInitUdpClient(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
return taosInitUdpConnection(ip, port, label, threads, fp, shandle);
}
void taosCleanUpUdpConnection(void *handle) { void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn; SUdpConn * pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
if (pSet->server == 1) {
pthread_cancel(pSet->tcpThread);
}
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
...@@ -621,8 +178,6 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -621,8 +178,6 @@ void taosCleanUpUdpConnection(void *handle) {
tTrace("chandle:%p is closed", pConn); tTrace("chandle:%p is closed", pConn);
} }
if (pSet->tcpFd >= 0) taosCloseTcpSocket(pSet->tcpFd);
pSet->tcpFd = -1;
taosTmrCleanUp(pSet->tmrCtrl); taosTmrCleanUp(pSet->tmrCtrl);
tfree(pSet); tfree(pSet);
} }
...@@ -641,6 +196,148 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por ...@@ -641,6 +196,148 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
return pConn; return pConn;
} }
static void *taosRecvUdpData(void *param) {
struct sockaddr_in sourceAdd;
int dataLen;
unsigned int addLen;
SUdpConn * pConn = (SUdpConn *)param;
uint16_t port;
int minSize = sizeof(SRpcHead);
SRecvInfo recvInfo;
memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd);
tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index);
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port),
dataLen);
if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
continue;
}
port = ntohs(sourceAdd.sin_port);
int processedLen = 0, leftLen = 0;
int msgLen = 0;
int count = 0;
char *msg = pConn->buffer;
while (processedLen < dataLen) {
leftLen = dataLen - processedLen;
SRpcHead *pHead = (SRpcHead *)msg;
msgLen = htonl((uint32_t)pHead->msgLen);
if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) {
tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen,
processedLen, count, msgLen);
break;
}
char *data = malloc((size_t)msgLen);
memcpy(data, msg, (size_t)msgLen);
recvInfo.msg = data;
recvInfo.msgLen = msgLen;
recvInfo.ip = sourceAdd.sin_addr.s_addr;
recvInfo.port = port;
recvInfo.shandle = pConn->shandle;
recvInfo.thandle = NULL;
recvInfo.chandle = pConn;
recvInfo.connType = 0;
(*(pConn->processData))(&recvInfo);
processedLen += msgLen;
msg += msgLen;
count++;
}
// tTrace("%s %d UDP packets are received together", pConn->label, count);
}
return NULL;
}
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle;
SUdpBuf * pBuf;
if (pConn == NULL || pConn->signature != pConn) return -1;
if (pConn->hash == NULL) {
struct sockaddr_in destAdd;
memset(&destAdd, 0, sizeof(destAdd));
destAdd.sin_family = AF_INET;
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
port, dataLen, ret, pConn->localPort, chandle);
return ret;
}
pthread_mutex_lock(&pConn->mutex);
pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port);
if (pBuf == NULL) {
pBuf = taosCreateUdpBuf(pConn, ip, port);
rpcAddIpHash(pConn->hash, pBuf, ip, port);
}
if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) {
taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
taosSendMsgHdr(pBuf->msgHdr, pConn->fd);
pBuf->totalLen = 0;
}
taosSetMsgHdrData(pBuf->msgHdr, data, dataLen);
pBuf->totalLen += dataLen;
pthread_mutex_unlock(&pConn->mutex);
return dataLen;
}
void taosFreeMsgHdr(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
free(msgHdr->msg_iov);
}
int taosMsgHdrSize(void *hdr) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
return (int)msgHdr->msg_iovlen;
}
void taosSendMsgHdr(void *hdr, int fd) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
sendmsg(fd, msgHdr, 0);
msgHdr->msg_iovlen = 0;
}
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) {
struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr));
memset(msgHdr, 0, sizeof(struct msghdr));
*hdr = msgHdr;
struct sockaddr_in *destAdd = (struct sockaddr_in *)dest;
msgHdr->msg_name = destAdd;
msgHdr->msg_namelen = sizeof(struct sockaddr_in);
int size = (int)sizeof(struct iovec) * maxPkts;
msgHdr->msg_iov = (struct iovec *)malloc((size_t)size);
memset(msgHdr->msg_iov, 0, (size_t)size);
}
void taosSetMsgHdrData(void *hdr, char *data, int dataLen) {
struct msghdr *msgHdr = (struct msghdr *)hdr;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data;
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen;
msgHdr->msg_iovlen++;
}
void taosRemoveUdpBuf(SUdpBuf *pBuf) { void taosRemoveUdpBuf(SUdpBuf *pBuf) {
taosTmrStopA(&pBuf->timer); taosTmrStopA(&pBuf->timer);
rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port);
...@@ -679,7 +376,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) { ...@@ -679,7 +376,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) {
if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
} }
SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) {
SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf)); SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf));
memset(pBuf, 0, sizeof(SUdpBuf)); memset(pBuf, 0, sizeof(SUdpBuf));
...@@ -700,121 +397,4 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { ...@@ -700,121 +397,4 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) {
return pBuf; return pBuf;
} }
int taosSendPacketViaTcp(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) {
SUdpConn * pConn = (SUdpConn *)chandle;
SUdpConnSet *pSet = (SUdpConnSet *)pConn->pSet;
int code = -1, retLen, msgLen;
char ipstr[64];
char buffer[128];
SRpcHead *pHead;
if (pSet->server) {
// send from server
pHead = (SRpcHead *)buffer;
memcpy(pHead, data, sizeof(SRpcHead));
pHead->tcp = 1;
SPacketInfo *pInfo = (SPacketInfo *)pHead->content;
pInfo->handle = (uint64_t)data;
pInfo->port = pSet->port;
pInfo->msgLen = pHead->msgLen;
msgLen = sizeof(SRpcHead) + sizeof(SPacketInfo);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
tTrace("%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x", pSet->label, pInfo->port,
pHead->msgType, htonl((uint32_t)pInfo->msgLen), pInfo->handle);
if (code > 0) code = dataLen;
} else {
// send from client
tTrace("%s data will be sent via TCP from client", pSet->label);
// send a UDP header first to set up the connection
pHead = (SRpcHead *)buffer;
memcpy(pHead, data, sizeof(SRpcHead));
pHead->tcp = 2;
msgLen = sizeof(SRpcHead);
pHead->msgLen = (int32_t)htonl(msgLen);
code = taosSendUdpData(ip, port, buffer, msgLen, chandle);
//pHead = (SRpcHead *)data;
tinet_ntoa(ipstr, ip);
int fd = taosOpenTcpClientSocket(ipstr, pConn->port, tsLocalIp);
if (fd < 0) {
tError("%s failed to open TCP socket to:%s:%hu to send packet", pSet->label, ipstr, pConn->port);
} else {
SHandleViaTcp handleViaTcp;
taosInitHandleViaTcp(&handleViaTcp, 0);
retLen = (int)taosWriteSocket(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp));
if (retLen != (int)sizeof(handleViaTcp)) {
tError("%s failed to send handle to server, retLen:%d", pSet->label, retLen);
} else {
retLen = taosWriteMsg(fd, data, dataLen);
if (retLen != dataLen) {
tError("%s failed to send data via TCP, dataLen:%d, retLen:%d, error:%s", pSet->label, dataLen, retLen,
strerror(errno));
} else {
code = dataLen;
tTrace("%s data is sent via TCP successfully", pSet->label);
}
}
taosReadMsg(fd, (char *)&handleViaTcp, sizeof(SHandleViaTcp));
taosCloseTcpSocket(fd);
}
}
return code;
}
int taosSendUdpData(uint32_t ip, uint16_t port, char *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle;
SUdpBuf * pBuf;
if (pConn == NULL || pConn->signature != pConn) return -1;
if (dataLen >= RPC_MAX_UDP_SIZE) return taosSendPacketViaTcp(ip, port, data, dataLen, chandle);
if (pConn->hash == NULL) {
struct sockaddr_in destAdd;
memset(&destAdd, 0, sizeof(destAdd));
destAdd.sin_family = AF_INET;
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
port, dataLen, ret, pConn->localPort, chandle);
return ret;
}
pthread_mutex_lock(&pConn->mutex);
pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port);
if (pBuf == NULL) {
pBuf = taosCreateUdpBuf(pConn, ip, port);
rpcAddIpHash(pConn->hash, pBuf, ip, port);
}
if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) {
taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
taosSendMsgHdr(pBuf->msgHdr, pConn->fd);
pBuf->totalLen = 0;
}
taosSetMsgHdrData(pBuf->msgHdr, data, dataLen);
pBuf->totalLen += dataLen;
pthread_mutex_unlock(&pConn->mutex);
return dataLen;
}
...@@ -85,7 +85,6 @@ int main(int argc, char *argv[]) { ...@@ -85,7 +85,6 @@ int main(int argc, char *argv[]) {
int msgSize = 128; int msgSize = 128;
int numOfReqs = 0; int numOfReqs = 0;
int appThreads = 1; int appThreads = 1;
char socketType[20] = "udp";
char serverIp[40] = "127.0.0.1"; char serverIp[40] = "127.0.0.1";
struct timeval systemTime; struct timeval systemTime;
int64_t startTime, endTime; int64_t startTime, endTime;
...@@ -113,9 +112,7 @@ int main(int argc, char *argv[]) { ...@@ -113,9 +112,7 @@ int main(int argc, char *argv[]) {
rpcInit.ckey = "key"; rpcInit.ckey = "key";
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]); ipSet.port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]); ipSet.ip[0] = inet_addr(argv[++i]);
...@@ -138,7 +135,6 @@ int main(int argc, char *argv[]) { ...@@ -138,7 +135,6 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tpc, default is:%s\n", socketType);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port); printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
...@@ -154,7 +150,7 @@ int main(int argc, char *argv[]) { ...@@ -154,7 +150,7 @@ int main(int argc, char *argv[]) {
} }
} }
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPC : TAOS_CONN_TCPC; rpcInit.connType = TAOS_CONN_CLIENT;
taosInitLog("client.log", 100000, 10); taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
......
...@@ -60,7 +60,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32 ...@@ -60,7 +60,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
char socketType[20] = "udp";
char dataName[20] = "server.data"; char dataName[20] = "server.data";
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -73,9 +72,7 @@ int main(int argc, char *argv[]) { ...@@ -73,9 +72,7 @@ int main(int argc, char *argv[]) {
rpcInit.idleTime = 2000; rpcInit.idleTime = 2000;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if ( strcmp(argv[i], "-c")==0 && i < argc-1 ) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
strcpy(socketType, argv[++i]);
} else if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]); rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) { } else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]); strcpy(rpcInit.localIp, argv[++i]);
...@@ -93,7 +90,6 @@ int main(int argc, char *argv[]) { ...@@ -93,7 +90,6 @@ int main(int argc, char *argv[]) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-c ctype]: connection type:udp or tcp, default is:%s\n", socketType);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp); printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
...@@ -107,7 +103,7 @@ int main(int argc, char *argv[]) { ...@@ -107,7 +103,7 @@ int main(int argc, char *argv[]) {
} }
} }
rpcInit.connType = strcasecmp(socketType, "udp") == 0 ? TAOS_CONN_UDPS : TAOS_CONN_TCPS; rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10); taosInitLog("server.log", 100000, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册