diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 3162ab2e4cbabc2a1ce9ed863589e3be0e92f75f..286ed223c79a2e226bd10aec722c6b95532ae733 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -21,6 +21,13 @@ #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" +#include "tlist.h" + +typedef struct SConnItem { + SOCKET fd; + uint32_t ip; + uint16_t port; +} SConnItem; typedef struct SFdObj { void *signature; @@ -38,6 +45,12 @@ typedef struct SThreadObj { pthread_t thread; SFdObj * pHead; pthread_mutex_t mutex; + // receive the notify from dispatch thread + + int notifyReceiveFd; + int notifySendFd; + SList *connQueue; + uint32_t ip; bool stop; EpollFd pollFd; @@ -69,6 +82,7 @@ typedef struct { } SServerObj; static void *taosProcessTcpData(void *param); +static void *taosProcessServerTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); @@ -124,6 +138,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; pThreadObj->stop = false; + pThreadObj->connQueue = tdListNew(sizeof(SConnItem)); } // initialize mutex, thread, fd which may fail @@ -142,7 +157,25 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); + int fds[2]; + if (pipe(fds)) { + tError("%s failed to create pipe", label); + code = -1; + break; + } + + pThreadObj->notifyReceiveFd = fds[0]; + pThreadObj->notifySendFd = fds[1]; + struct epoll_event event; + event.events = EPOLLIN | EPOLLRDHUP; + event.data.fd = pThreadObj->notifyReceiveFd; + if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, pThreadObj->notifyReceiveFd , &event) < 0) { + tError("%s failed to create pipe", label); + code = -1; + break; + } + + code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessServerTcpData, (void *)(pThreadObj)); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); break; @@ -275,17 +308,12 @@ static void *taosAcceptTcpConnection(void *arg) { // pick up the thread to handle this connection pThreadObj = pServerObj->pThreadObj[threadId]; - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); - if (pFdObj) { - pFdObj->ip = caddr.sin_addr.s_addr; - pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, - taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); - } else { - taosCloseSocket(connFd); - tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), - taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); - } + pthread_mutex_lock(&(pThreadObj->mutex)); + SConnItem item = {.fd = connFd, .ip = caddr.sin_addr.s_addr, .port = htons(caddr.sin_port)}; + tdListAppend(pThreadObj->connQueue, &item); + pthread_mutex_unlock(&(pThreadObj->mutex)); + + write(pThreadObj->notifySendFd, "", 1); // pick up next thread for next connection threadId++; @@ -591,6 +619,109 @@ static void *taosProcessTcpData(void *param) { return NULL; } +static void *taosProcessServerTcpData(void *param) { + SThreadObj *pThreadObj = param; + SFdObj *pFdObj; + struct epoll_event events[maxEvents]; + SRecvInfo recvInfo; + + char bb[1]; +#ifdef __APPLE__ + taos_block_sigalrm(); +#endif // __APPLE__ + while (1) { + int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); + if (pThreadObj->stop) { + tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label); + break; + } + if (fdNum < 0) continue; + + for (int i = 0; i < fdNum; ++i) { + if (events[i].data.fd == pThreadObj->notifyReceiveFd) { + if (events[i].events & EPOLLIN) { + read(pThreadObj->notifyReceiveFd, bb, 1); + + pthread_mutex_lock(&(pThreadObj->mutex)); + SListNode *head = tdListPopHead(pThreadObj->connQueue); + pthread_mutex_unlock(&(pThreadObj->mutex)); + + SConnItem item = {0}; + tdListNodeGetData(pThreadObj->connQueue, head, &item); + tfree(head); + + // register fd on epoll + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, item.fd); + if (pFdObj) { + pFdObj->ip = item.ip; + pFdObj->port = item.port; + tDebug("%s new TCP connection from %u:%hu, fd:%d FD:%p numOfFds:%d", pThreadObj->label, + pFdObj->ip, pFdObj->port, item.fd, pFdObj, pThreadObj->numOfFds); + } else { + taosCloseSocket(item.fd); + tError("%s failed to malloc FdObj(%s) for connection from:%u:%hu", pThreadObj->label, strerror(errno), + pFdObj->ip, pFdObj->port); + } + } + continue; + } + pFdObj = events[i].data.ptr; + + if (events[i].events & EPOLLERR) { + tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); + taosReportBrokenLink(pFdObj); + continue; + } + + if (events[i].events & EPOLLRDHUP) { + tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); + taosReportBrokenLink(pFdObj); + continue; + } + + if (events[i].events & EPOLLHUP) { + tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); + taosReportBrokenLink(pFdObj); + continue; + } + + if (taosReadTcpData(pFdObj, &recvInfo) < 0) { + shutdown(pFdObj->fd, SHUT_WR); + continue; + } + + pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); + if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); + } + + if (pThreadObj->stop) break; + } + + if (pThreadObj->connQueue) { + pThreadObj->connQueue = tdListFree(pThreadObj->connQueue); + } + // close pipe + close(pThreadObj->notifySendFd); + close(pThreadObj->notifyReceiveFd); + + if (pThreadObj->pollFd >=0) { + EpollClose(pThreadObj->pollFd); + pThreadObj->pollFd = -1; + } + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosReportBrokenLink(pFdObj); + } + + pthread_mutex_destroy(&(pThreadObj->mutex)); + tDebug("%s TCP thread exits ...", pThreadObj->label); + tfree(pThreadObj); + + return NULL; +} + static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { struct epoll_event event;