diff --git a/src/rpc/src/tudp.c b/src/rpc/src/tudp.c index 4b7d4c97fbe17a2bd33ea1adb9686901bf9c2149..b79f40019eaa9d2caaa2b1ab1660476bfec64937 100644 --- a/src/rpc/src/tudp.c +++ b/src/rpc/src/tudp.c @@ -137,9 +137,7 @@ void taosProcessMonitorTimer(void *param, void *tmrId) { tTrace("%s monitor timer is expired, update the link status", pSet->label); (*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL); taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - } - - if (pMonitor->pSet == NULL) { + } else { taosTmrStopA(&pMonitor->pTimer); free(pMonitor); } @@ -181,6 +179,7 @@ void *taosReadTcpData(void *argv) { if (retLen != pInfo->msgLen) { tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen); + free(buffer); } else { (*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn); } @@ -214,9 +213,11 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor); if (code < 0) { - tTrace("%s faile to create thread to read tcp data, reason:%s", pSet->label, strerror(errno)); + tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno)); + pMonitor->pSet = NULL; } + pthread_attr_destroy(&thattr); return code; } @@ -402,6 +403,9 @@ void *taosUdpTcpConnection(void *argv) { tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port); + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + while (1) { if (pSet->tcpFd < 0) break; socklen_t addrlen = sizeof(clientAddr); @@ -422,14 +426,14 @@ void *taosUdpTcpConnection(void *argv) { pTransfer->port = clientAddr.sin_port; pTransfer->pSet = pSet; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) { - tTrace("%s faile to create thread for UDP server, reason:%s", pSet->label, strerror(errno)); + tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno)); + free(pTransfer); taosCloseSocket(connFd); } } + pthread_attr_destroy(&thattr); return NULL; } @@ -448,7 +452,6 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void memset(pSet, 0, (size_t)size); strcpy(pSet->ip, ip); pSet->port = port; - pSet->threads = threads; pSet->shandle = shandle; pSet->fp = fp; pSet->tcpFd = -1; @@ -458,8 +461,16 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void char udplabel[12]; sprintf(udplabel, "%s.b", label); pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); + if (pSet->tmrCtrl == NULL) { + tError("%s failed to initialize tmrCtrl") + taosCleanUpUdpConnection(pSet); + return NULL; + } // } + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + short ownPort; for (int i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -467,6 +478,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pConn->fd = taosOpenUdpSocket(ip, ownPort); if (pConn->fd < 0) { tError("%s failed to open UDP socket %s:%d", label, ip, port); + taosCleanUpUdpConnection(pSet); return NULL; } @@ -477,11 +489,10 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pConn->localPort = (int16_t)ntohs(sin.sin_port); } - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { - close(pConn->fd); tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); + taosCloseSocket(pConn->fd); + taosCleanUpUdpConnection(pSet); return NULL; } @@ -496,6 +507,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pthread_mutex_init(&pConn->mutex, NULL); pConn->tmrCtrl = pSet->tmrCtrl; } + ++pSet->threads; } pthread_attr_destroy(&thAttr); @@ -520,6 +532,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp // pthread_t thread; // pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet); pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet); + pthread_attr_destroy(&thattr); return pSet; } @@ -540,13 +553,16 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pConn->signature = NULL; + pthread_cancel(pConn->thread); taosCloseSocket(pConn->fd); if (pConn->hash) { taosCloseIpHash(pConn->hash); pthread_mutex_destroy(&pConn->mutex); } + } - pthread_cancel(pConn->thread); + for (int i = 0; i < pSet->threads; ++i) { + pConn = pSet->udpConn + i; pthread_join(pConn->thread, NULL); tTrace("chandle:%p is closed", pConn); }