diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index 6a210a136ffe67b2e1394d26bac4cb5083452c8c..7397a629d34443e4a391a54edc1584fed596da3c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -13,18 +13,22 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "os.h" #include "tulog.h" #include "tutil.h" #include "tsocket.h" #include "taoserror.h" #include "taosTcpPool.h" +#include "twal.h" +#include "tsync.h" +#include "syncInt.h" typedef struct SThreadObj { pthread_t thread; bool stop; - int pollFd; - int numOfFds; + int32_t pollFd; + int32_t numOfFds; struct SPoolObj *pPool; } SThreadObj; @@ -32,15 +36,15 @@ typedef struct SPoolObj { SPoolInfo info; SThreadObj **pThread; pthread_t thread; - int nextId; - int acceptFd; // FD for accept new connection + int32_t nextId; + int32_t acceptFd; // FD for accept new connection } SPoolObj; typedef struct { SThreadObj *pThread; - void *ahandle; - int fd; - int closedByApp; + void * ahandle; + int32_t fd; + int32_t closedByApp; } SConnObj; static void *taosAcceptPeerTcpConnection(void *argv); @@ -53,66 +57,66 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); if (pPool == NULL) { - uError("TCP server, no enough memory"); + sError("failed to alloc pool for TCP server since no enough memory"); return NULL; } pPool->info = *pInfo; - pPool->pThread = (SThreadObj **)calloc(sizeof(SThreadObj *), pInfo->numOfThreads); + pPool->pThread = calloc(sizeof(SThreadObj *), pInfo->numOfThreads); if (pPool->pThread == NULL) { - uError("TCP server, no enough memory"); - free(pPool); + sError("failed to alloc pool thread for TCP server since no enough memory"); + tfree(pPool); return NULL; } pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port); if (pPool->acceptFd < 0) { - free(pPool->pThread); - free(pPool); - uError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); + tfree(pPool->pThread); + tfree(pPool); + sError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno)); return NULL; } pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { - uError("TCP server, failed to create accept thread, reason:%s", strerror(errno)); + sError("failed to create accept thread for TCP server since %s", strerror(errno)); close(pPool->acceptFd); - free(pPool->pThread); - free(pPool); + tfree(pPool->pThread); + tfree(pPool); return NULL; } pthread_attr_destroy(&thattr); - uDebug("%p TCP pool is created", pPool); + sDebug("%p TCP pool is created", pPool); return pPool; } void taosCloseTcpThreadPool(void *param) { - SPoolObj * pPool = (SPoolObj *)param; + SPoolObj * pPool = param; SThreadObj *pThread; shutdown(pPool->acceptFd, SHUT_RD); pthread_join(pPool->thread, NULL); - for (int i = 0; i < pPool->info.numOfThreads; ++i) { + for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { pThread = pPool->pThread[i]; if (pThread) taosStopPoolThread(pThread); } - uDebug("%p TCP pool is closed", pPool); + sDebug("%p TCP pool is closed", pPool); taosTFree(pPool->pThread); - free(pPool); + tfree(pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { +void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { struct epoll_event event; - SPoolObj *pPool = (SPoolObj *)param; + SPoolObj *pPool = param; - SConnObj *pConn = (SConnObj *)calloc(sizeof(SConnObj), 1); + SConnObj *pConn = calloc(sizeof(SConnObj), 1); if (pConn == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; @@ -120,7 +124,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { SThreadObj *pThread = taosGetTcpThread(pPool); if (pThread == NULL) { - free(pConn); + tfree(pConn); return NULL; } @@ -133,13 +137,13 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { event.data.ptr = pConn; if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - uError("failed to add fd:%d(%s)", connFd, strerror(errno)); + sError("failed to add fd:%d since %s", connFd, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - free(pConn); + tfree(pConn); pConn = NULL; } else { pThread->numOfFds++; - uDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); + sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); } return pConn; @@ -149,7 +153,7 @@ void taosFreeTcpConn(void *param) { SConnObj * pConn = (SConnObj *)param; SThreadObj *pThread = pConn->pThread; - uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); + sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); pConn->closedByApp = 1; shutdown(pConn->fd, SHUT_WR); } @@ -164,9 +168,9 @@ static void taosProcessBrokenLink(SConnObj *pConn) { pThread->numOfFds--; epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); - uDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); + sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); taosClose(pConn->fd); - free(pConn); + tfree(pConn); } #define maxEvents 10 @@ -183,18 +187,18 @@ static void *taosProcessTcpData(void *param) { while (1) { if (pThread->stop) break; - int fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); + int32_t fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME); if (pThread->stop) { - uDebug("%p TCP epoll thread is exiting...", pThread); + sDebug("%p TCP epoll thread is exiting...", pThread); break; } if (fdNum < 0) { - uError("epoll_wait failed (%s)", strerror(errno)); + sError("epoll_wait failed since %s", strerror(errno)); continue; } - for (int i = 0; i < fdNum; ++i) { + for (int32_t i = 0; i < fdNum; ++i) { pConn = events[i].data.ptr; assert(pConn); @@ -219,17 +223,16 @@ static void *taosProcessTcpData(void *param) { continue; } } - } if (pThread->stop) break; } - uDebug("%p TCP epoll thread exits", pThread); + sDebug("%p TCP epoll thread exits", pThread); close(pThread->pollFd); - free(pThread); - free(buffer); + tfree(pThread); + tfree(buffer); return NULL; } @@ -242,18 +245,18 @@ static void *taosAcceptPeerTcpConnection(void *argv) { while (1) { struct sockaddr_in clientAddr; socklen_t addrlen = sizeof(clientAddr); - int connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); + int32_t connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd < 0) { if (errno == EINVAL) { - uDebug("%p TCP server accept is exiting...", pPool); + sDebug("%p TCP server accept is exiting...", pPool); break; } else { - uError("TCP accept failure, reason:%s", strerror(errno)); + sError("TCP accept failure since %s", strerror(errno)); continue; } } - // uDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); + // sDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port); taosKeepTcpAlive(connFd); (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); } @@ -273,23 +276,23 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { pThread->pPool = pPool; pThread->pollFd = epoll_create(10); // size does not matter if (pThread->pollFd < 0) { - free(pThread); + tfree(pThread); return NULL; } pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); + int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); pthread_attr_destroy(&thattr); if (ret != 0) { close(pThread->pollFd); - free(pThread); + tfree(pThread); return NULL; } - uDebug("%p TCP epoll thread is created", pThread); + sDebug("%p TCP epoll thread is created", pThread); pPool->pThread[pPool->nextId] = pThread; pPool->nextId++; pPool->nextId = pPool->nextId % pPool->info.numOfThreads; @@ -314,12 +317,12 @@ static void taosStopPoolThread(SThreadObj *pThread) { eventfd_t fd = eventfd(1, 0); if (fd == -1) { // failed to create eventfd, call pthread_cancel instead, which may result in data corruption - uError("failed to create eventfd(%s)", strerror(errno)); + sError("failed to create eventfd since %s", strerror(errno)); pthread_cancel(pThread->thread); pThread->stop = true; } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption - uError("failed to call epoll_ctl(%s)", strerror(errno)); + sError("failed to call epoll_ctl since %s", strerror(errno)); pthread_cancel(pThread->thread); }