提交 c9279cdc 编写于 作者: Y yihaoDeng

refactor rpc

上级 4f6fa025
......@@ -21,6 +21,13 @@
#include "rpcLog.h"
#include "rpcHead.h"
#include "rpcTcp.h"
#include "tlist.h"
typedef struct SConnItem {
SOCKET fd;
uint32_t ip;
uint16_t port;
} SConnItem;
typedef struct SFdObj {
void *signature;
......@@ -38,6 +45,12 @@ typedef struct SThreadObj {
pthread_t thread;
SFdObj * pHead;
pthread_mutex_t mutex;
// receive the notify from dispatch thread
int notifyReceiveFd;
int notifySendFd;
SList *connQueue;
uint32_t ip;
bool stop;
EpollFd pollFd;
......@@ -69,6 +82,7 @@ typedef struct {
} SServerObj;
static void *taosProcessTcpData(void *param);
static void *taosProcessServerTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
......@@ -124,6 +138,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle;
pThreadObj->stop = false;
pThreadObj->connQueue = tdListNew(sizeof(SConnItem));
}
// initialize mutex, thread, fd which may fail
......@@ -142,7 +157,25 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
int fds[2];
if (pipe(fds)) {
tError("%s failed to create pipe", label);
code = -1;
break;
}
pThreadObj->notifyReceiveFd = fds[0];
pThreadObj->notifySendFd = fds[1];
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP;
event.data.fd = pThreadObj->notifyReceiveFd;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, pThreadObj->notifyReceiveFd , &event) < 0) {
tError("%s failed to create pipe", label);
code = -1;
break;
}
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessServerTcpData, (void *)(pThreadObj));
if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
break;
......@@ -275,17 +308,12 @@ static void *taosAcceptTcpConnection(void *arg) {
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else {
taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
}
pthread_mutex_lock(&(pThreadObj->mutex));
SConnItem item = {.fd = connFd, .ip = caddr.sin_addr.s_addr, .port = htons(caddr.sin_port)};
tdListAppend(pThreadObj->connQueue, &item);
pthread_mutex_unlock(&(pThreadObj->mutex));
write(pThreadObj->notifySendFd, "", 1);
// pick up next thread for next connection
threadId++;
......@@ -591,6 +619,109 @@ static void *taosProcessTcpData(void *param) {
return NULL;
}
static void *taosProcessServerTcpData(void *param) {
SThreadObj *pThreadObj = param;
SFdObj *pFdObj;
struct epoll_event events[maxEvents];
SRecvInfo recvInfo;
char bb[1];
#ifdef __APPLE__
taos_block_sigalrm();
#endif // __APPLE__
while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
if (pThreadObj->stop) {
tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label);
break;
}
if (fdNum < 0) continue;
for (int i = 0; i < fdNum; ++i) {
if (events[i].data.fd == pThreadObj->notifyReceiveFd) {
if (events[i].events & EPOLLIN) {
read(pThreadObj->notifyReceiveFd, bb, 1);
pthread_mutex_lock(&(pThreadObj->mutex));
SListNode *head = tdListPopHead(pThreadObj->connQueue);
pthread_mutex_unlock(&(pThreadObj->mutex));
SConnItem item = {0};
tdListNodeGetData(pThreadObj->connQueue, head, &item);
tfree(head);
// register fd on epoll
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, item.fd);
if (pFdObj) {
pFdObj->ip = item.ip;
pFdObj->port = item.port;
tDebug("%s new TCP connection from %u:%hu, fd:%d FD:%p numOfFds:%d", pThreadObj->label,
pFdObj->ip, pFdObj->port, item.fd, pFdObj, pThreadObj->numOfFds);
} else {
taosCloseSocket(item.fd);
tError("%s failed to malloc FdObj(%s) for connection from:%u:%hu", pThreadObj->label, strerror(errno),
pFdObj->ip, pFdObj->port);
}
}
continue;
}
pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) {
tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLRDHUP) {
tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLHUP) {
tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
shutdown(pFdObj->fd, SHUT_WR);
continue;
}
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
}
if (pThreadObj->stop) break;
}
if (pThreadObj->connQueue) {
pThreadObj->connQueue = tdListFree(pThreadObj->connQueue);
}
// close pipe
close(pThreadObj->notifySendFd);
close(pThreadObj->notifyReceiveFd);
if (pThreadObj->pollFd >=0) {
EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1;
}
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosReportBrokenLink(pFdObj);
}
pthread_mutex_destroy(&(pThreadObj->mutex));
tDebug("%s TCP thread exits ...", pThreadObj->label);
tfree(pThreadObj);
return NULL;
}
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
struct epoll_event event;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册