From 78c1351f9932ea20dccb4dc3385cda253282e80d Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 9 Jun 2020 00:58:04 +0000 Subject: [PATCH] failed to open TCP server socket, return error for RPC initialization --- src/dnode/src/dnodeSystem.c | 1 + src/rpc/src/rpcTcp.c | 25 +++++++++-------- src/rpc/src/rpcUdp.c | 54 ++++++++++++++++++++----------------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 97cf6f7eeb..0a983362c2 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -92,6 +92,7 @@ int32_t main(int32_t argc, char *argv[]) { // Initialize the system if (dnodeInitSystem() < 0) { syslog(LOG_ERR, "Error initialize TDengine system"); + dPrint("Failed to start TDengine, please check the log at:%s", tsLogDir); closelog(); exit(EXIT_FAILURE); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 04a269502e..7035b30b66 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -67,7 +67,7 @@ static void *taosProcessTcpData(void *param); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); -static void* taosAcceptTcpConnection(void *arg); +static void *taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { SServerObj *pServerObj; @@ -80,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread return NULL; } + pServerObj->fd = -1; pServerObj->thread = 0; pServerObj->ip = ip; pServerObj->port = port; @@ -99,6 +100,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + // initialize parameters in case it may encounter error later pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { pThreadObj->pollFd = -1; @@ -106,18 +108,21 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; + pThreadObj++; + } + // initialize mutex, thread, fd which may fail + pThreadObj = pServerObj->pThreadObj; + for (int i = 0; i < numOfThreads; ++i) { code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); break;; } pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); - terrno = TAOS_SYSTEM_ERROR(errno); code = -1; break; } @@ -125,7 +130,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); break; } @@ -133,15 +137,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj++; } + pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); + if (pServerObj->fd < 0) code = -1; + if (code == 0) { - code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); + code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); } } if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); taosCleanUpTcpServer(pServerObj); pServerObj = NULL; } else { @@ -204,7 +211,7 @@ void taosCleanUpTcpServer(void *handle) { tfree(pServerObj); } -static void* taosAcceptTcpConnection(void *arg) { +static void *taosAcceptTcpConnection(void *arg) { int connFd = -1; struct sockaddr_in caddr; int threadId = 0; @@ -212,10 +219,6 @@ static void* taosAcceptTcpConnection(void *arg) { SServerObj *pServerObj; pServerObj = (SServerObj *)arg; - - pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); - if (pServerObj->fd < 0) return NULL; - tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); while (1) { diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 7e2fe0db61..279cf7ed49 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -19,6 +19,7 @@ #include "ttimer.h" #include "tutil.h" #include "taosdef.h" +#include "taoserror.h" #include "rpcLog.h" #include "rpcUdp.h" #include "rpcHead.h" @@ -65,6 +66,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pSet = (SUdpConnSet *)malloc((size_t)size); if (pSet == NULL) { tError("%s failed to allocate UdpConn", label); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -73,30 +75,34 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pSet->port = port; pSet->shandle = shandle; pSet->fp = fp; + pSet->threads = threads; tstrncpy(pSet->label, label, sizeof(pSet->label)); + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + int i; uint16_t ownPort; - for (int i = 0; i < threads; ++i) { + for (i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; ownPort = (port ? port + i : 0); pConn->fd = taosOpenUdpSocket(ip, ownPort); if (pConn->fd < 0) { tError("%s failed to open UDP socket %x:%hu", label, ip, port); - taosCleanUpUdpConnection(pSet); - return NULL; + break; } pConn->buffer = malloc(RPC_MAX_UDP_SIZE); if (NULL == pConn->buffer) { tError("%s failed to malloc recv buffer", label); - taosCleanUpUdpConnection(pSet); - return NULL; + break; } struct sockaddr_in sin; unsigned int addrlen = sizeof(sin); - if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && - addrlen == sizeof(sin)) { + if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && + sin.sin_family == AF_INET && addrlen == sizeof(sin)) { pConn->localPort = (uint16_t)ntohs(sin.sin_port); } @@ -107,23 +113,22 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pConn->pSet = pSet; pConn->signature = pConn; - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); - pthread_attr_destroy(&thAttr); if (code != 0) { - tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); - taosCloseSocket(pConn->fd); - taosCleanUpUdpConnection(pSet); - return NULL; + tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno)); + break; } - - ++pSet->threads; } - tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads); + pthread_attr_destroy(&thAttr); + if (i != threads) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosCleanUpUdpConnection(pSet); + return NULL; + } + + tTrace("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads); return pSet; } @@ -136,16 +141,17 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pConn->signature = NULL; + // shutdown to signal the thread to exit - shutdown(pConn->fd, SHUT_RD); + if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD); } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - pthread_join(pConn->thread, NULL); - free(pConn->buffer); - taosCloseSocket(pConn->fd); - tTrace("chandle:%p is closed", pConn); + if (pConn->thread) pthread_join(pConn->thread, NULL); + if (pConn->fd >=0) taosCloseSocket(pConn->fd); + tfree(pConn->buffer); + tTrace("UDP chandle:%p is closed", pConn); } tfree(pSet); @@ -159,7 +165,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t SUdpConn *pConn = pSet->udpConn + pSet->index; pConn->port = port; - tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, pConn->localPort); + tTrace("%s UDP connection is setup, ip:%x:%hu", pConn->label, ip, port); return pConn; } -- GitLab