diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index 264449bbb0a8f449c835017b295926c407636b50..7ca24f6229753fcb703b8b44b70a1f31985eca6b 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -65,18 +65,18 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, pTcp->shandle = shandle; if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) { - tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno)); + tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); return NULL; } if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); + tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); return NULL; } pTcp->pollFd = epoll_create(10); // size does not matter if (pTcp->pollFd < 0) { - tError("%s failed to create TCP epoll", label); + tError("%s failed to create TCP client epoll", label); return NULL; } @@ -87,11 +87,11 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)); pthread_attr_destroy(&thattr); if (code != 0) { - tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); + tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); return NULL; } - tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port); + tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port); return pTcp; } @@ -181,18 +181,30 @@ int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void return (int)send(pFdObj->fd, data, (size_t)len, 0); } -static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { - STcpClient *pTcp; +static void taosReportBrokenLink(STcpFd *pFdObj) { SRecvInfo recvInfo; + STcpClient *pTcp = pFdObj->pTcp; + + if (pFdObj->thandle) { + recvInfo.msg = NULL; + recvInfo.msgLen = 0; + recvInfo.ip = 0; + recvInfo.port = 0; + recvInfo.shandle = pTcp->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = NULL; + recvInfo.connType = RPC_CONN_TCP; + (*(pTcp->processData))(&recvInfo); + } +} + +static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; - pTcp = pFdObj->pTcp; - if (pTcp == NULL) { - tError("double free TcpFdObj!!!!"); - return; - } + pFdObj->signature = NULL; + STcpClient *pTcp = pFdObj->pTcp; epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); close(pFdObj->fd); @@ -202,7 +214,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { pTcp->numOfFds--; if (pTcp->numOfFds < 0) - tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj); + tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj); if (pFdObj->prev) { (pFdObj->prev)->next = pFdObj->next; @@ -216,19 +228,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { pthread_mutex_unlock(&pTcp->mutex); - recvInfo.msg = NULL; - recvInfo.msgLen = 0; - recvInfo.ip = 0; - recvInfo.port = 0; - recvInfo.shandle = pTcp->shandle; - recvInfo.thandle = pFdObj->thandle;; - recvInfo.chandle = NULL; - recvInfo.connType = RPC_CONN_TCP; + tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds); - if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo); - tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds); - - memset(pFdObj, 0, sizeof(STcpFd)); tfree(pFdObj); } @@ -252,29 +253,29 @@ static void *taosReadTcpData(void *param) { pFdObj = events[i].data.ptr; if (events[i].events & EPOLLERR) { - tTrace("%s TCP error happened on FD\n", pTcp->label); - taosCleanUpTcpFdObj(pFdObj); + tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle); + taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLHUP) { - tTrace("%s TCP FD hang up\n", pTcp->label); - taosCleanUpTcpFdObj(pFdObj); + tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle); + taosReportBrokenLink(pFdObj); continue; } int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { - tError("%s read error, headLen:%d", pTcp->label, headLen); - taosCleanUpTcpFdObj(pFdObj); + tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen); + taosReportBrokenLink(pFdObj); continue; } int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead); if (NULL == buffer) { - tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen); - taosCleanUpTcpFdObj(pFdObj); + tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen); + taosReportBrokenLink(pFdObj); continue; } @@ -283,9 +284,10 @@ static void *taosReadTcpData(void *param) { int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { - tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen); + tError("%s %p, read error, leftLen:%d retLen:%d", + pTcp->label, pFdObj->thandle, leftLen, retLen); tfree(buffer); - taosCleanUpTcpFdObj(pFdObj); + taosReportBrokenLink(pFdObj); continue; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index bec9621e3b417196a205cbba39a796cac0d18298..8280264764a4463407ebf74e193f464cc4dd07d8 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -817,7 +817,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; tTrace("%s %p, link is broken", pRpc->label, pConn); - pConn->chandle = NULL; + // pConn->chandle = NULL; if (pConn->outType) { SRpcReqContext *pContext = pConn->pContext; diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 538b3059e3af68d406ec1b978bfef355fe26b920..37576fa0f68c3af27aeac3f160f6db7bdb7b325c 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -65,88 +65,94 @@ static void taosProcessTcpData(void *param); static void taosAcceptTcpConnection(void *arg); void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { - int i; SServerObj *pServerObj; - pthread_attr_t thattr; SThreadObj *pThreadObj; - pServerObj = (SServerObj *)malloc(sizeof(SServerObj)); + pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); strcpy(pServerObj->ip, ip); pServerObj->port = port; strcpy(pServerObj->label, label); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads); + pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); + free(pServerObj); return NULL; } - memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads); - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int code = 0; pThreadObj = pServerObj->pThreadObj; - for (i = 0; i < numOfThreads; ++i) { + for (int i = 0; i < numOfThreads; ++i) { pThreadObj->processData = fp; strcpy(pThreadObj->label, label); pThreadObj->shandle = shandle; - if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) { - tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno)); - return NULL; + code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL); + if (code < 0) { + tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); + break;; } - if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { - tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); - return NULL; + code = pthread_cond_init(&(pThreadObj->fdReady), NULL); + if (code != 0) { + tError("%s init TCP condition variable failed(%s)", label, strerror(errno)); + break; } pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); - return NULL; + code = -1; + break; } - if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { - tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); - return NULL; + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)); + pthread_attr_destroy(&thattr); + if (code != 0) { + tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); + break; } pThreadObj->threadId = i; pThreadObj++; } - if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { - tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); - return NULL; + if (code == 0) { + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); + pthread_attr_destroy(&thattr); + if (code != 0) { + tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); + } } - /* - if ( pthread_create(&(pServerObj->thread), &thattr, - (void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) { - tError("%s failed to create UD accept thread, reason:%s", label, - strerror(errno)); - return NULL; - } - */ - pthread_attr_destroy(&thattr); - tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); + if (code != 0) { + free(pServerObj->pThreadObj); + free(pServerObj); + pServerObj = NULL; + } else { + tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); + } return (void *)pServerObj; } void taosCleanUpTcpServer(void *handle) { - int i; SThreadObj *pThreadObj; - SServerObj *pServerObj = (SServerObj *)handle; + SServerObj *pServerObj = handle; if (pServerObj == NULL) return; pthread_cancel(pServerObj->thread); pthread_join(pServerObj->thread, NULL); - for (i = 0; i < pServerObj->numOfThreads; ++i) { + for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; while (pThreadObj->pHead) { @@ -161,22 +167,21 @@ void taosCleanUpTcpServer(void *handle) { pthread_mutex_destroy(&(pThreadObj->threadMutex)); } - tfree(pServerObj->pThreadObj); tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); + tfree(pServerObj->pThreadObj); tfree(pServerObj); } void taosCloseTcpServerConnection(void *chandle) { - SFdObj *pFdObj = (SFdObj *)chandle; - + SFdObj *pFdObj = chandle; if (pFdObj == NULL) return; taosCleanUpFdObj(pFdObj); } int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { - SFdObj *pFdObj = (SFdObj *)chandle; + SFdObj *pFdObj = chandle; if (chandle == NULL) return -1; @@ -185,6 +190,25 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void #define maxEvents 10 +static void taosReportBrokenLink(SFdObj *pFdObj) { + + SThreadObj *pThreadObj = pFdObj->pThreadObj; + + // notify the upper layer, so it will clean the associated context + if (pFdObj->thandle) { + SRecvInfo recvInfo; + recvInfo.msg = NULL; + recvInfo.msgLen = 0; + recvInfo.ip = 0; + recvInfo.port = 0; + recvInfo.shandle = pThreadObj->shandle; + recvInfo.thandle = pFdObj->thandle;; + recvInfo.chandle = NULL; + recvInfo.connType = RPC_CONN_TCP; + (*(pThreadObj->processData))(&recvInfo); + } +} + static void taosProcessTcpData(void *param) { SThreadObj * pThreadObj; int i, fdNum; @@ -208,29 +232,29 @@ static void taosProcessTcpData(void *param) { pFdObj = events[i].data.ptr; if (events[i].events & EPOLLERR) { - tTrace("%s TCP thread:%d, error happened on FD", pThreadObj->label, pThreadObj->threadId); - taosCleanUpFdObj(pFdObj); + tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle); + taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLHUP) { - tTrace("%s TCP thread:%d, FD hang up", pThreadObj->label, pThreadObj->threadId); - taosCleanUpFdObj(pFdObj); + tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); + taosReportBrokenLink(pFdObj); continue; } int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { - tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); - taosCleanUpFdObj(pFdObj); + tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); + taosReportBrokenLink(pFdObj); continue; } int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); char *buffer = malloc(msgLen + tsRpcOverhead); if ( NULL == buffer) { - tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen); - taosCleanUpFdObj(pFdObj); + tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); + taosReportBrokenLink(pFdObj); continue; } @@ -239,8 +263,9 @@ static void taosProcessTcpData(void *param) { int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { - tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen); - taosCleanUpFdObj(pFdObj); + tError("%s %p, read error, leftLen:%d retLen:%d", + pThreadObj->label, pFdObj->thandle, leftLen, retLen); + taosReportBrokenLink(pFdObj); tfree(buffer); continue; } @@ -278,10 +303,10 @@ static void taosAcceptTcpConnection(void *arg) { sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); if (sockFd < 0) { - tError("%s failed to open TCP socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); return; } else { - tTrace("%s TCP server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); + tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); } while (1) { @@ -289,11 +314,11 @@ static void taosAcceptTcpConnection(void *arg) { connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd < 0) { - tError("%s TCP accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno)); + tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); continue; } - tTrace("%s TCP connection from ip:%s port:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), + tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); taosKeepTcpAlive(connFd); @@ -318,7 +343,7 @@ static void taosAcceptTcpConnection(void *arg) { event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; event.data.ptr = pFdObj; if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - tError("%s failed to add TCP FD for epoll, error:%s", pServerObj->label, strerror(errno)); + tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno)); tfree(pFdObj); close(connFd); continue; @@ -333,7 +358,7 @@ static void taosAcceptTcpConnection(void *arg) { pthread_cond_signal(&pThreadObj->fdReady); pthread_mutex_unlock(&(pThreadObj->threadMutex)); - tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, + tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds); // pick up next thread for next connection @@ -343,26 +368,23 @@ static void taosAcceptTcpConnection(void *arg) { } static void taosCleanUpFdObj(SFdObj *pFdObj) { - SThreadObj *pThreadObj; if (pFdObj == NULL) return; if (pFdObj->signature != pFdObj) return; - pThreadObj = pFdObj->pThreadObj; - if (pThreadObj == NULL) { - tError("FdObj double clean up!!!"); - return; - } + pFdObj->signature = NULL; + SThreadObj *pThreadObj = pFdObj->pThreadObj; - epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); close(pFdObj->fd); + epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); pthread_mutex_lock(&pThreadObj->threadMutex); pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) - tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId); + tError("%s %p, TCP thread:%d, number of FDs is negative!!!", + pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); // remove from the FdObject list @@ -378,23 +400,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) { pthread_mutex_unlock(&pThreadObj->threadMutex); - // notify the upper layer, so it will clean the associated context - SRecvInfo recvInfo; - recvInfo.msg = NULL; - recvInfo.msgLen = 0; - recvInfo.ip = 0; - recvInfo.port = 0; - recvInfo.shandle = pThreadObj->shandle; - recvInfo.thandle = pFdObj->thandle;; - recvInfo.chandle = NULL; - recvInfo.connType = RPC_CONN_TCP; - - if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo); - - tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId, - pFdObj, pThreadObj->numOfFds); - - memset(pFdObj, 0, sizeof(SFdObj)); + tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", + pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); tfree(pFdObj); } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 785288f5b9bcf91219ea992a659c32f85d9336cf..e666187cf1ddcbbde8e94a60728bcfcf97c3780d 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -78,7 +78,6 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); static void taosProcessUdpBufTimer(void *param, void *tmrId); void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { - pthread_attr_t thAttr; SUdpConn * pConn; SUdpConnSet * pSet; @@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v } } - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - uint16_t ownPort; for (int i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pConn->tmrCtrl = pSet->tmrCtrl; } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); + pthread_attr_destroy(&thAttr); if (code != 0) { tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); taosCloseSocket(pConn->fd); taosCleanUpUdpConnection(pSet); - pthread_attr_destroy(&thAttr); return NULL; } ++pSet->threads; } - pthread_attr_destroy(&thAttr); tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); return pSet; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index f000ab91a273251972e0145ff431a21bc0c8b519..732d7eb81cd211dfc2be4452351bbbdf44cdbb95 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -204,6 +204,8 @@ int main(int argc, char *argv[]) { tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + getchar(); + taosCloseLogger(); return 0;