提交 db39d9ad 编写于 作者: S Shengliang Guan

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

......@@ -248,13 +248,13 @@ static void *taosAcceptTcpConnection(void *arg) {
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = caddr.sin_port;
tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
pFdObj->port = htons(caddr.sin_port);
tTrace("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
inet_ntoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else {
close(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
inet_ntoa(caddr.sin_addr), caddr.sin_port);
inet_ntoa(caddr.sin_addr), htons(caddr.sin_port));
}
// pick up next thread for next connection
......@@ -333,14 +333,22 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd < 0) return NULL;
struct sockaddr_in sin;
uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port);
}
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) {
pFdObj->thandle = thandle;
pFdObj->port = port;
pFdObj->ip = ip;
tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
tTrace("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else {
close(fd);
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
......@@ -353,7 +361,10 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return;
pFdObj->thandle = NULL;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
tTrace("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
// pFdObj->thandle = NULL;
pFdObj->closedByApp = 1;
shutdown(pFdObj->fd, SHUT_WR);
}
......@@ -398,14 +409,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
tTrace("%s %p read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
return -1;
}
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
}
......@@ -414,8 +425,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d",
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
tError("%s %p read error, leftLen:%d retLen:%d FD:%p",
pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer);
return -1;
}
......@@ -459,19 +470,19 @@ static void *taosProcessTcpData(void *param) {
pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) {
tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
tTrace("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLRDHUP) {
tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
tTrace("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
if (events[i].events & EPOLLHUP) {
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
tTrace("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj);
continue;
}
......@@ -540,7 +551,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)
tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
tError("%s %p TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
if (pFdObj->prev) {
......@@ -555,7 +566,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex);
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
tTrace("%s %p TCP connection is closed, FD:%p numOfFds:%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
tfree(pFdObj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册