未验证 提交 8961c97e 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #16604 from taosdata/fix/TS-1813-V26

feat(rcp): add some log on rpc and tcp
...@@ -32,6 +32,7 @@ typedef struct SFdObj { ...@@ -32,6 +32,7 @@ typedef struct SFdObj {
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
struct SFdObj *prev; struct SFdObj *prev;
struct SFdObj *next; struct SFdObj *next;
uint64_t ctime; // create time
} SFdObj; } SFdObj;
typedef struct SThreadObj { typedef struct SThreadObj {
...@@ -280,6 +281,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -280,6 +281,7 @@ static void *taosAcceptTcpConnection(void *arg) {
if (pFdObj) { if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port); pFdObj->port = htons(caddr.sin_port);
pFdObj->ctime = taosGetTimestampMs();
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
...@@ -417,6 +419,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -417,6 +419,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
pFdObj->ctime = taosGetTimestampMs();
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
...@@ -432,7 +435,7 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -432,7 +435,7 @@ void taosCloseTcpConnection(void *chandle) {
if (pFdObj == NULL || pFdObj->signature != pFdObj) return; if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("DEEP %s shutdown3 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime);
// pFdObj->thandle = NULL; // pFdObj->thandle = NULL;
pFdObj->closedByApp = 1; pFdObj->closedByApp = 1;
...@@ -441,11 +444,28 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -441,11 +444,28 @@ void taosCloseTcpConnection(void *chandle) {
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; if (pFdObj == NULL) {
tError("DEEP TCP send data failed(chandle null). data=0x%p len=%d ip=0x%0x port=%d", data, len, ip, port);
return -1;
}
if(pFdObj->signature != pFdObj) {
tError("DEEP TCP send data failed(sig diff). pFdObj=0x%p sig=0x%p data=%p len=%d ip=0x%x port=%d", pFdObj, pFdObj->signature, data, len, ip, port);
return -2;
}
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
int ret = taosWriteMsg(pFdObj->fd, data, len); int ret = taosWriteMsg(pFdObj->fd, data, len);
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
if(ret < 0) {
tError("DEEP %s %p TCP data sent failed and try again, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds);
ret = taosWriteMsg(pFdObj->fd, data, len);
if(ret < 0) {
tError("DEEP %s %p Second TCP data sent failed, FD:%p fd:%d ctime=%" PRId64 " ret=%d le=%d ip=0x%x port=%d threadid=%d numofFds=%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pFdObj->ctime, ret, len, ip, port, pThreadObj->threadId, pThreadObj->numOfFds);
}
}
return ret; return ret;
} }
...@@ -457,6 +477,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -457,6 +477,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
if (pFdObj->closedByApp == 0) { if (pFdObj->closedByApp == 0) {
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
tDebug("DEEP %s shutdown2 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime);
SRecvInfo recvInfo; SRecvInfo recvInfo;
recvInfo.msg = NULL; recvInfo.msg = NULL;
...@@ -574,6 +595,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -574,6 +595,7 @@ static void *taosProcessTcpData(void *param) {
} }
if (taosReadTcpData(pFdObj, &recvInfo) < 0) { if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
tDebug("DEEP %s shutdown1 fd=%d ip=0x%x port=%d ctime=%" PRId64, pThreadObj->label, pFdObj->fd, pFdObj->ip, pFdObj->port, pFdObj->ctime);
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
continue; continue;
} }
...@@ -650,8 +672,11 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -650,8 +672,11 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pFdObj->signature = NULL; pFdObj->signature = NULL;
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
tDebug("DEEP %s close1 fd=%d pFdObj=%p ip=0x%x port=%d ctime=%" PRId64 " thandle=%p",
pThreadObj->label, pFdObj->fd, pFdObj, pFdObj->ip, pFdObj->port, pFdObj->ctime, pFdObj->thandle);
taosCloseSocket(pFdObj->fd); taosCloseSocket(pFdObj->fd);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!",
......
...@@ -129,15 +129,18 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { ...@@ -129,15 +129,18 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
if (nwritten <= 0) { if (nwritten <= 0) {
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
continue; continue;
else else {
return -1; uError("DEEP write socket failed3. errno=%d fd=%d writeten=%d left=%d", errno, fd, nwritten, nleft);
return -3;
}
} else { } else {
nleft -= nwritten; nleft -= nwritten;
ptr += nwritten; ptr += nwritten;
} }
if (errno == SIGPIPE || errno == EPIPE) { if (errno == SIGPIPE || errno == EPIPE) {
return -1; uError("DEEP write socket failed4. errno=%d fd=%d writeten=%d left=%d", errno, fd, nwritten, nleft);
return -4;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册