From 35c7506dd096edd534c5b93eef4b4afc6ac647cc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Apr 2021 22:48:35 +0800 Subject: [PATCH] reset tcp --- src/rpc/src/rpcTcp.c | 155 ++++--------------------------------------- 1 file changed, 12 insertions(+), 143 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 286ed223c7..3162ab2e4c 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -21,13 +21,6 @@ #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" -#include "tlist.h" - -typedef struct SConnItem { - SOCKET fd; - uint32_t ip; - uint16_t port; -} SConnItem; typedef struct SFdObj { void *signature; @@ -45,12 +38,6 @@ typedef struct SThreadObj { pthread_t thread; SFdObj * pHead; pthread_mutex_t mutex; - // receive the notify from dispatch thread - - int notifyReceiveFd; - int notifySendFd; - SList *connQueue; - uint32_t ip; bool stop; EpollFd pollFd; @@ -82,7 +69,6 @@ typedef struct { } SServerObj; static void *taosProcessTcpData(void *param); -static void *taosProcessServerTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); @@ -138,7 +124,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; pThreadObj->stop = false; - pThreadObj->connQueue = tdListNew(sizeof(SConnItem)); } // initialize mutex, thread, fd which may fail @@ -157,25 +142,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - int fds[2]; - if (pipe(fds)) { - tError("%s failed to create pipe", label); - code = -1; - break; - } - - pThreadObj->notifyReceiveFd = fds[0]; - pThreadObj->notifySendFd = fds[1]; - struct epoll_event event; - event.events = EPOLLIN | EPOLLRDHUP; - event.data.fd = pThreadObj->notifyReceiveFd; - if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, pThreadObj->notifyReceiveFd , &event) < 0) { - tError("%s failed to create pipe", label); - code = -1; - break; - } - - code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessServerTcpData, (void *)(pThreadObj)); + code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); break; @@ -308,12 +275,17 @@ static void *taosAcceptTcpConnection(void *arg) { // pick up the thread to handle this connection pThreadObj = pServerObj->pThreadObj[threadId]; - pthread_mutex_lock(&(pThreadObj->mutex)); - SConnItem item = {.fd = connFd, .ip = caddr.sin_addr.s_addr, .port = htons(caddr.sin_port)}; - tdListAppend(pThreadObj->connQueue, &item); - pthread_mutex_unlock(&(pThreadObj->mutex)); - - write(pThreadObj->notifySendFd, "", 1); + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); + if (pFdObj) { + pFdObj->ip = caddr.sin_addr.s_addr; + pFdObj->port = htons(caddr.sin_port); + tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, + taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); + } else { + taosCloseSocket(connFd); + tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), + taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); + } // pick up next thread for next connection threadId++; @@ -619,109 +591,6 @@ static void *taosProcessTcpData(void *param) { return NULL; } -static void *taosProcessServerTcpData(void *param) { - SThreadObj *pThreadObj = param; - SFdObj *pFdObj; - struct epoll_event events[maxEvents]; - SRecvInfo recvInfo; - - char bb[1]; -#ifdef __APPLE__ - taos_block_sigalrm(); -#endif // __APPLE__ - while (1) { - int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); - if (pThreadObj->stop) { - tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label); - break; - } - if (fdNum < 0) continue; - - for (int i = 0; i < fdNum; ++i) { - if (events[i].data.fd == pThreadObj->notifyReceiveFd) { - if (events[i].events & EPOLLIN) { - read(pThreadObj->notifyReceiveFd, bb, 1); - - pthread_mutex_lock(&(pThreadObj->mutex)); - SListNode *head = tdListPopHead(pThreadObj->connQueue); - pthread_mutex_unlock(&(pThreadObj->mutex)); - - SConnItem item = {0}; - tdListNodeGetData(pThreadObj->connQueue, head, &item); - tfree(head); - - // register fd on epoll - SFdObj *pFdObj = taosMallocFdObj(pThreadObj, item.fd); - if (pFdObj) { - pFdObj->ip = item.ip; - pFdObj->port = item.port; - tDebug("%s new TCP connection from %u:%hu, fd:%d FD:%p numOfFds:%d", pThreadObj->label, - pFdObj->ip, pFdObj->port, item.fd, pFdObj, pThreadObj->numOfFds); - } else { - taosCloseSocket(item.fd); - tError("%s failed to malloc FdObj(%s) for connection from:%u:%hu", pThreadObj->label, strerror(errno), - pFdObj->ip, pFdObj->port); - } - } - continue; - } - pFdObj = events[i].data.ptr; - - if (events[i].events & EPOLLERR) { - tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (events[i].events & EPOLLRDHUP) { - tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (events[i].events & EPOLLHUP) { - tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); - taosReportBrokenLink(pFdObj); - continue; - } - - if (taosReadTcpData(pFdObj, &recvInfo) < 0) { - shutdown(pFdObj->fd, SHUT_WR); - continue; - } - - pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); - if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); - } - - if (pThreadObj->stop) break; - } - - if (pThreadObj->connQueue) { - pThreadObj->connQueue = tdListFree(pThreadObj->connQueue); - } - // close pipe - close(pThreadObj->notifySendFd); - close(pThreadObj->notifyReceiveFd); - - if (pThreadObj->pollFd >=0) { - EpollClose(pThreadObj->pollFd); - pThreadObj->pollFd = -1; - } - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosReportBrokenLink(pFdObj); - } - - pthread_mutex_destroy(&(pThreadObj->mutex)); - tDebug("%s TCP thread exits ...", pThreadObj->label); - tfree(pThreadObj); - - return NULL; -} - static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { struct epoll_event event; -- GitLab