diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 0410281f0dee3c7f685417b08620aa8d2c9aec49..a8bb2fd65b2091e33eb26d3f738cbfe4f736f69e 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -867,9 +867,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { // underlying UDP layer does not know it is server or client pRecv->connType = pRecv->connType | pRpc->connType; - if (pRecv->ip==0 && pConn) { - rpcProcessBrokenLink(pConn); - rpcFreeMsg(pRecv->msg); + if (pRecv->ip == 0 && pConn) { + rpcProcessBrokenLink(pConn); return NULL; } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index a7e00027fb23aeefeb576295d06ab8d3fe62eeaa..09649e3a7c4243876b7cc66aae99dc9859c2d5c6 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -215,7 +215,6 @@ static void* taosAcceptTcpConnection(void *arg) { continue; } - tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port); taosKeepTcpAlive(connFd); // pick up the thread to handle this connection @@ -229,7 +228,8 @@ static void* taosAcceptTcpConnection(void *arg) { inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); } else { close(connFd); - tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno)); + tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), + inet_ntoa(caddr.sin_addr), caddr.sin_port); } // pick up next thread for next connection @@ -341,7 +341,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { recvInfo.chandle = NULL; recvInfo.connType = RPC_CONN_TCP; (*(pThreadObj->processData))(&recvInfo); - } + } else { + taosFreeFdObj(pFdObj); + } } #define maxEvents 10 @@ -352,7 +354,7 @@ static void *taosProcessTcpData(void *param) { struct epoll_event events[maxEvents]; SRecvInfo recvInfo; SRpcHead rpcHead; - + while (1) { int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); if (pThreadObj->stop) { @@ -416,7 +418,6 @@ static void *taosProcessTcpData(void *param) { recvInfo.connType = RPC_CONN_TCP; pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); - if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } } @@ -466,7 +467,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pFdObj->signature = NULL; epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - close(pFdObj->fd); + taosCloseTcpSocket(pFdObj->fd); pThreadObj->numOfFds--; diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 958d099027f2072b82aee45fe302f0042c1fd8aa..508f04fbc349ec5485360e70992c6e35b1f8728c 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -127,6 +127,8 @@ int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; + taosBlockSIGPIPE(); + memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 7000; rpcInit.label = "SER";