提交 6b58cbc2 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -248,13 +248,13 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -248,13 +248,13 @@ static void *taosAcceptTcpConnection(void *arg) {
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) { if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = caddr.sin_port; pFdObj->port = htons(caddr.sin_port);
tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, tTrace("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); inet_ntoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
close(connFd); close(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
inet_ntoa(caddr.sin_addr), caddr.sin_port); inet_ntoa(caddr.sin_addr), htons(caddr.sin_port));
} }
// pick up next thread for next connection // pick up next thread for next connection
...@@ -333,14 +333,22 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -333,14 +333,22 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd < 0) return NULL; if (fd < 0) return NULL;
struct sockaddr_in sin;
uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port);
}
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) { if (pFdObj) {
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", tTrace("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds); pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
close(fd); close(fd);
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
...@@ -353,7 +361,10 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -353,7 +361,10 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
pFdObj->thandle = NULL; SThreadObj *pThreadObj = pFdObj->pThreadObj;
tTrace("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
// pFdObj->thandle = NULL;
pFdObj->closedByApp = 1; pFdObj->closedByApp = 1;
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
} }
...@@ -398,14 +409,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -398,14 +409,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); tTrace("%s %p read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
return -1; return -1;
} }
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
buffer = malloc(msgLen + tsRpcOverhead); buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) { if ( NULL == buffer) {
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} }
...@@ -414,8 +425,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -414,8 +425,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s %p, read error, leftLen:%d retLen:%d", tError("%s %p read error, leftLen:%d retLen:%d FD:%p",
pThreadObj->label, pFdObj->thandle, leftLen, retLen); pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer); free(buffer);
return -1; return -1;
} }
...@@ -459,19 +470,19 @@ static void *taosProcessTcpData(void *param) { ...@@ -459,19 +470,19 @@ static void *taosProcessTcpData(void *param) {
pFdObj = events[i].data.ptr; pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle); tTrace("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) { if (events[i].events & EPOLLRDHUP) {
tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle); tTrace("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); tTrace("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
...@@ -540,7 +551,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -540,7 +551,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p, TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
if (pFdObj->prev) { if (pFdObj->prev) {
...@@ -555,7 +566,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -555,7 +566,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex); pthread_mutex_unlock(&pThreadObj->mutex);
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", tTrace("%s %p TCP connection is closed, FD:%p numOfFds:%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
tfree(pFdObj); tfree(pFdObj);
......
...@@ -35,6 +35,7 @@ cd ../../../debug; make ...@@ -35,6 +35,7 @@ cd ../../../debug; make
./test.sh -f unique/db/replica_reduce31.sim ./test.sh -f unique/db/replica_reduce31.sim
./test.sh -f unique/db/replica_part.sim ./test.sh -f unique/db/replica_part.sim
./test.sh -f unique/dnode/alternativeRole.sim
./test.sh -f unique/dnode/balance1.sim ./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim ./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/dnode/balance3.sim ./test.sh -f unique/dnode/balance3.sim
...@@ -75,3 +76,59 @@ cd ../../../debug; make ...@@ -75,3 +76,59 @@ cd ../../../debug; make
./test.sh -f unique/vnode/replica3_basic.sim ./test.sh -f unique/vnode/replica3_basic.sim
./test.sh -f unique/vnode/replica3_repeat.sim ./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim ./test.sh -f unique/vnode/replica3_vgroup.sim
./test.sh -f general/stream/metrics_1.sim
./test.sh -f general/stream/metrics_del.sim
./test.sh -f general/stream/metrics_n.sim
./test.sh -f general/stream/metrics_replica1_vnoden.sim
#./test.sh -f general/stream/new_stream.sim
./test.sh -f general/stream/restart_stream.sim
./test.sh -f general/stream/stream_1.sim
./test.sh -f general/stream/stream_2.sim
./test.sh -f general/stream/stream_3.sim
./test.sh -f general/stream/stream_restart.sim
./test.sh -f general/stream/table_1.sim
./test.sh -f general/stream/table_del.sim
./test.sh -f general/stream/table_n.sim
./test.sh -f general/stream/table_replica1_vnoden.sim
./test.sh -f unique/arbitrator/check_cluster_cfg_para.sim
./test.sh -f unique/arbitrator/dn2_mn1_cache_file_sync.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_createTableFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_full_dropDnodeFail.sim
./test.sh -f unique/arbitrator/dn3_mn1_multiCreateDropTable.sim
./test.sh -f unique/arbitrator/dn3_mn1_nw_disable_timeout_autoDropDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica2_wal1_AddDelDnode.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r3_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim
./test.sh -f unique/arbitrator/dn3_mn2_killDnode.sim
./test.sh -f unique/arbitrator/insert_duplicationTs.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim
./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim
./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim
./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册