From e90d241eeb682102dbdeb7da7cbb1ace62b6a5f2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 20 Jun 2020 11:41:12 +0000 Subject: [PATCH] stop UDP/TCP connection first, then close all connections, then clean up --- src/rpc/inc/rpcTcp.h | 1 + src/rpc/src/rpcMain.c | 30 ++++++++++++++++++++++++++---- src/rpc/src/rpcTcp.c | 13 +++++++++---- src/rpc/src/rpcUdp.c | 8 ++------ 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 40fab00056..0b0e5bd0fb 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -21,6 +21,7 @@ extern "C" { #endif void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); +void taosStopTcpServer(void *param); void taosCleanUpTcpServer(void *param); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 007a511adf..780ec5ccba 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -147,12 +147,19 @@ void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, vo }; void (*taosCleanUpConn[])(void *thandle) = { - taosCleanUpUdpConnection, - taosCleanUpUdpConnection, + NULL, + NULL, taosCleanUpTcpServer, taosCleanUpTcpClient }; +void (*taosStopConn[])(void *thandle) = { + taosCleanUpUdpConnection, + taosCleanUpUdpConnection, + taosStopTcpServer, + NULL +}; + int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { taosSendUdpData, taosSendUdpData, @@ -289,14 +296,26 @@ void *rpcOpen(const SRpcInit *pInit) { void rpcClose(void *param) { SRpcInfo *pRpc = (SRpcInfo *)param; + // stop connection to outside first + if (taosStopConn[pRpc->connType | RPC_CONN_TCP]) + (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); + + if (taosStopConn[pRpc->connType]) + (*taosStopConn[pRpc->connType])(pRpc->udphandle); + + // close all connections for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); } } - (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); - (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); + // clean up + if (taosCleanUpConn[pRpc->connType | RPC_CONN_TCP]) + (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); + + if (taosCleanUpConn[pRpc->connType]) + (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); tTrace("%s rpc is closed", pRpc->label); rpcDecRef(pRpc); @@ -588,6 +607,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { pConn->inTranId = 0; pConn->outTranId = 0; pConn->secured = 0; + pConn->peerId = 0; pConn->peerIp = 0; pConn->peerPort = 0; pConn->pReqMsg = NULL; @@ -627,6 +647,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); + tTrace("%s %p client connection is allocated", pRpc->label, pConn); } return pConn; @@ -681,6 +702,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); + tTrace("%s %p server connection is allocated", pRpc->label, pConn); } return pConn; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 5d156492c7..11d2c57dda 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -190,14 +190,19 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - -void taosCleanUpTcpServer(void *handle) { +void taosStopTcpServer(void *handle) { SServerObj *pServerObj = handle; - SThreadObj *pThreadObj; + tTrace("TCP:%s, stop accept new connections", pServerObj->label); if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL); +} + +void taosCleanUpTcpServer(void *handle) { + SServerObj *pServerObj = handle; + SThreadObj *pThreadObj; + if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; @@ -226,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) { connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); if (connFd == -1) { if (errno == EINVAL) { - tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); + tTrace("%s TCP server stop accepting new connections, exiting", pServerObj->label); break; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 41446f87fb..35e06e633b 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -30,7 +30,6 @@ #define RPC_MAX_UDP_SIZE 65480 typedef struct { - void *signature; int index; int fd; uint16_t port; // peer port @@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pConn->processData = fp; pConn->index = i; pConn->pSet = pSet; - pConn->signature = pConn; int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); if (code != 0) { @@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - pConn->signature = NULL; - if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); } @@ -185,7 +181,7 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); if(dataLen <= 0) { - tTrace("%s UDP socket was closed, exiting", pConn->label); + tTrace("%s UDP socket was closed, exiting(%s)", pConn->label, strerror(errno)); break; } @@ -221,7 +217,7 @@ static void *taosRecvUdpData(void *param) { int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { SUdpConn *pConn = (SUdpConn *)chandle; - if (pConn == NULL || pConn->signature != pConn) return -1; + if (pConn == NULL) return -1; struct sockaddr_in destAdd; memset(&destAdd, 0, sizeof(destAdd)); -- GitLab