From 50d7e6876d3a7e281be267dc6326f6bd0e84b32e Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 12 Apr 2020 13:49:34 +0800 Subject: [PATCH] remove possible memory leaks --- src/rpc/src/rpcServer.c | 86 ++++++++++++++++++++++------------------- src/rpc/src/rpcUdp.c | 10 ++--- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 538b3059e3..6fe5385bba 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -65,79 +65,85 @@ static void taosProcessTcpData(void *param); static void taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - int i; SServerObj *pServerObj; - pthread_attr_t thattr; SThreadObj *pThreadObj; - pServerObj = (SServerObj *)malloc(sizeof(SServerObj)); + pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); strcpy(pServerObj->ip, ip); pServerObj->port = port; strcpy(pServerObj->label, label); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads); + pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); + free(pServerObj); return NULL; } - memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int code = 0; pThreadObj = pServerObj->pThreadObj; - for (i = 0; i < numOfThreads; ++i) { + for (int i = 0; i < numOfThreads; ++i) { pThreadObj->processData = fp; strcpy(pThreadObj->label, label); pThreadObj->shandle = shandle; - if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) { - tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno)); - return NULL; + code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL); + if (code < 0) { + tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); + break;; } - if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); - return NULL; + code = pthread_cond_init(&(pThreadObj->fdReady), NULL); + if (code != 0) { + tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); + break; } pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); - return NULL; + code = -1; + break; } - if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { - tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); - return NULL; + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)); + pthread_attr_destroy(&thattr); + if (code != 0) { + tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); + break; } pThreadObj->threadId = i; pThreadObj++; } - if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { - tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); - return NULL; + if (code == 0) { + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); + pthread_attr_destroy(&thattr); + if (code != 0) { + tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); + } } - /* - if ( pthread_create(&(pServerObj->thread), &thattr, - (void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) { - tError("%s failed to create UD accept thread, reason:%s", label, - strerror(errno)); - return NULL; - } - */ - pthread_attr_destroy(&thattr); - tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); + if (code != 0) { + free(pServerObj->pThreadObj); + free(pServerObj); + pServerObj = NULL; + } else { + tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); + } return (void *)pServerObj; } void taosCleanUpTcpServer(void *handle) { - int i; SThreadObj *pThreadObj; SServerObj *pServerObj = (SServerObj *)handle; @@ -146,7 +152,7 @@ void taosCleanUpTcpServer(void *handle) { pthread_cancel(pServerObj->thread); pthread_join(pServerObj->thread, NULL); - for (i = 0; i < pServerObj->numOfThreads; ++i) { + for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; while (pThreadObj->pHead) { @@ -161,9 +167,9 @@ void taosCleanUpTcpServer(void *handle) { pthread_mutex_destroy(&(pThreadObj->threadMutex)); } - tfree(pServerObj->pThreadObj); tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); + tfree(pServerObj->pThreadObj); tfree(pServerObj); } @@ -278,10 +284,10 @@ static void taosAcceptTcpConnection(void *arg) { sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); if (sockFd < 0) { - tError("%s failed to open TCP socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); return; } else { - tTrace("%s TCP server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); } while (1) { @@ -289,11 +295,11 @@ static void taosAcceptTcpConnection(void *arg) { connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd < 0) { - tError("%s TCP accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno)); + tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); continue; } - tTrace("%s TCP connection from ip:%s port:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), + tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); taosKeepTcpAlive(connFd); @@ -318,7 +324,7 @@ static void taosAcceptTcpConnection(void *arg) { event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; event.data.ptr = pFdObj; if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - tError("%s failed to add TCP FD for epoll, error:%s", pServerObj->label, strerror(errno)); + tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno)); tfree(pFdObj); close(connFd); continue; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 785288f5b9..e666187cf1 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -78,7 +78,6 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); static void taosProcessUdpBufTimer(void *param, void *tmrId); void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - pthread_attr_t thAttr; SUdpConn * pConn; SUdpConnSet * pSet; @@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v } } - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - uint16_t ownPort; for (int i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pConn->tmrCtrl = pSet->tmrCtrl; } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); + pthread_attr_destroy(&thAttr); if (code != 0) { tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); taosCloseSocket(pConn->fd); taosCleanUpUdpConnection(pSet); - pthread_attr_destroy(&thAttr); return NULL; } ++pSet->threads; } - pthread_attr_destroy(&thAttr); tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); return pSet; -- GitLab