diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 530a279cc72a291525356aa1f2a6495f605946a0..43c386b8b50ab79aa51403b1e71d109619cdb935 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -32,6 +32,7 @@ typedef struct SFdObj { struct SThreadObj *pThreadObj; struct SFdObj *prev; struct SFdObj *next; + uint64_t ctime; // create time } SFdObj; typedef struct SThreadObj { @@ -280,6 +281,7 @@ static void *taosAcceptTcpConnection(void *arg) { if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); + pFdObj->ctime = taosGetTimestampMs(); tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { @@ -417,6 +419,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin pFdObj->thandle = thandle; pFdObj->port = port; pFdObj->ip = ip; + pFdObj->ctime = taosGetTimestampMs(); tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); } else { @@ -432,7 +435,7 @@ void taosCloseTcpConnection(void *chandle) { if (pFdObj == NULL || pFdObj->signature != pFdObj) return; SThreadObj *pThreadObj = pFdObj->pThreadObj; - tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); + tDebug("DEEP %s shutdown3 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); // pFdObj->thandle = NULL; pFdObj->closedByApp = 1; @@ -441,11 +444,28 @@ void taosCloseTcpConnection(void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { SFdObj *pFdObj = chandle; - if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; + if (pFdObj == NULL) { + tError("DEEP TCP send data failed(chandle null). data=0x%p len=%d ip=0x%0x port=%d", data, len, ip, port); + return -1; + } + if(pFdObj->signature != pFdObj) { + tError("DEEP TCP send data failed(sig diff). pFdObj=0x%p sig=0x%p data=%p len=%d ip=0x%x port=%d", pFdObj, pFdObj->signature, data, len, ip, port); + return -2; + } + SThreadObj *pThreadObj = pFdObj->pThreadObj; int ret = taosWriteMsg(pFdObj->fd, data, len); tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); + if(ret < 0) { + tError("DEEP %s %p TCP data sent failed and try again, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d", + pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds); + ret = taosWriteMsg(pFdObj->fd, data, len); + if(ret < 0) { + tError("DEEP %s %p Second TCP data sent failed, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d", + pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds); + } + } return ret; } @@ -457,6 +477,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { // notify the upper layer, so it will clean the associated context if (pFdObj->closedByApp == 0) { shutdown(pFdObj->fd, SHUT_WR); + tDebug("DEEP %s shutdown2 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); SRecvInfo recvInfo; recvInfo.msg = NULL; @@ -574,6 +595,7 @@ static void *taosProcessTcpData(void *param) { } if (taosReadTcpData(pFdObj, &recvInfo) < 0) { + tDebug("DEEP %s shutdown1 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime); shutdown(pFdObj->fd, SHUT_WR); continue; } @@ -650,7 +672,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pFdObj->signature = NULL; epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); + tDebug("DEEP %s close1 fd=%d pFdObj=%p ip=0x%x port=%d ctime=%" PRId64 " thandle=%p", + pThreadObj->label, pFdObj->fd, pFdObj, pFdObj->ip, pFdObj->port, pFdObj->ctime, pFdObj->thandle); taosCloseSocket(pFdObj->fd); + pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 0ebe9c8f8af23186fb4cd60824bfa5641d2090a9..19c72d2e0b2b4e059883a506e3c4ad1f1b4cf809 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -129,15 +129,18 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { if (nwritten <= 0) { if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) continue; - else - return -1; + else { + uError("DEEP write socket failed3. errno=%d fd=%d writeten=%d left=%d", errno, fd, nwritten, nleft); + return -3; + } } else { nleft -= nwritten; ptr += nwritten; } if (errno == SIGPIPE || errno == EPIPE) { - return -1; + uError("DEEP write socket failed4. errno=%d fd=%d writeten=%d left=%d", errno, fd, nwritten, nleft); + return -4; } }