diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index aa94accceb734cfae00ebfbf7171f8b96b2961ef..82168f0b0e6da3809cc6820243d30e9635dfcc3f 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -248,13 +248,13 @@ static void *taosAcceptTcpConnection(void *arg) { SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; - pFdObj->port = caddr.sin_port; - tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, - inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); + pFdObj->port = htons(caddr.sin_port); + tTrace("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, + inet_ntoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { close(connFd); tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), - inet_ntoa(caddr.sin_addr), caddr.sin_port); + inet_ntoa(caddr.sin_addr), htons(caddr.sin_port)); } // pick up next thread for next connection @@ -333,14 +333,22 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); if (fd < 0) return NULL; + struct sockaddr_in sin; + uint16_t localPort = 0; + unsigned int addrlen = sizeof(sin); + if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && + sin.sin_family == AF_INET && addrlen == sizeof(sin)) { + localPort = (uint16_t)ntohs(sin.sin_port); + } + SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); if (pFdObj) { pFdObj->thandle = thandle; pFdObj->port = port; pFdObj->ip = ip; - tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", - pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds); + tTrace("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", + pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); } else { close(fd); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); @@ -353,7 +361,10 @@ void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; if (pFdObj == NULL) return; - pFdObj->thandle = NULL; + SThreadObj *pThreadObj = pFdObj->pThreadObj; + tTrace("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); + + // pFdObj->thandle = NULL; pFdObj->closedByApp = 1; shutdown(pFdObj->fd, SHUT_WR); } @@ -398,14 +409,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { - tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); + tTrace("%s %p read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); return -1; } msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); buffer = malloc(msgLen + tsRpcOverhead); if ( NULL == buffer) { - tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); + tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } @@ -414,8 +425,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { - tError("%s %p, read error, leftLen:%d retLen:%d", - pThreadObj->label, pFdObj->thandle, leftLen, retLen); + tError("%s %p read error, leftLen:%d retLen:%d FD:%p", + pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); free(buffer); return -1; } @@ -459,19 +470,19 @@ static void *taosProcessTcpData(void *param) { pFdObj = events[i].data.ptr; if (events[i].events & EPOLLERR) { - tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle); + tTrace("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLRDHUP) { - tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle); + tTrace("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); taosReportBrokenLink(pFdObj); continue; } if (events[i].events & EPOLLHUP) { - tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); + tTrace("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); taosReportBrokenLink(pFdObj); continue; } @@ -540,7 +551,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) - tError("%s %p, TCP thread:%d, number of FDs is negative!!!", + tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); if (pFdObj->prev) { @@ -555,7 +566,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pthread_mutex_unlock(&pThreadObj->mutex); - tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", + tTrace("%s %p TCP connection is closed, FD:%p numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds); tfree(pFdObj); diff --git a/tests/script/jenkins/unique.txt b/tests/script/jenkins/unique.txt index 4f194cde93dfe29280d1e11ccf84308f0d9c3644..da0aee3d343f652562426d8d077ab56734729fed 100644 --- a/tests/script/jenkins/unique.txt +++ b/tests/script/jenkins/unique.txt @@ -35,6 +35,7 @@ cd ../../../debug; make ./test.sh -f unique/db/replica_reduce31.sim ./test.sh -f unique/db/replica_part.sim +./test.sh -f unique/dnode/alternativeRole.sim ./test.sh -f unique/dnode/balance1.sim ./test.sh -f unique/dnode/balance2.sim ./test.sh -f unique/dnode/balance3.sim @@ -75,3 +76,59 @@ cd ../../../debug; make ./test.sh -f unique/vnode/replica3_basic.sim ./test.sh -f unique/vnode/replica3_repeat.sim ./test.sh -f unique/vnode/replica3_vgroup.sim + +./test.sh -f general/stream/metrics_1.sim +./test.sh -f general/stream/metrics_del.sim +./test.sh -f general/stream/metrics_n.sim +./test.sh -f general/stream/metrics_replica1_vnoden.sim +#./test.sh -f general/stream/new_stream.sim +./test.sh -f general/stream/restart_stream.sim +./test.sh -f general/stream/stream_1.sim +./test.sh -f general/stream/stream_2.sim +./test.sh -f general/stream/stream_3.sim +./test.sh -f general/stream/stream_restart.sim +./test.sh -f general/stream/table_1.sim +./test.sh -f general/stream/table_del.sim +./test.sh -f general/stream/table_n.sim +./test.sh -f general/stream/table_replica1_vnoden.sim + +./test.sh -f unique/arbitrator/check_cluster_cfg_para.sim +./test.sh -f unique/arbitrator/dn2_mn1_cache_file_sync.sim +./test.sh -f unique/arbitrator/dn3_mn1_full_createTableFail.sim +./test.sh -f unique/arbitrator/dn3_mn1_full_dropDnodeFail.sim +./test.sh -f unique/arbitrator/dn3_mn1_multiCreateDropTable.sim +./test.sh -f unique/arbitrator/dn3_mn1_nw_disable_timeout_autoDropDnode.sim +./test.sh -f unique/arbitrator/dn3_mn1_replica2_wal1_AddDelDnode.sim +./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim +./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim +./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim +./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim +./test.sh -f unique/arbitrator/dn3_mn1_r3_vnode_delDir.sim +./test.sh -f unique/arbitrator/dn3_mn1_vnode_nomaster.sim +./test.sh -f unique/arbitrator/dn3_mn2_killDnode.sim +./test.sh -f unique/arbitrator/insert_duplicationTs.sim +./test.sh -f unique/arbitrator/offline_replica2_alterTable_online.sim +./test.sh -f unique/arbitrator/offline_replica2_alterTag_online.sim +./test.sh -f unique/arbitrator/offline_replica2_createTable_online.sim +./test.sh -f unique/arbitrator/offline_replica2_dropDb_online.sim +./test.sh -f unique/arbitrator/offline_replica2_dropTable_online.sim +./test.sh -f unique/arbitrator/offline_replica3_alterTable_online.sim +./test.sh -f unique/arbitrator/offline_replica3_alterTag_online.sim +./test.sh -f unique/arbitrator/offline_replica3_createTable_online.sim +./test.sh -f unique/arbitrator/offline_replica3_dropDb_online.sim +./test.sh -f unique/arbitrator/offline_replica3_dropTable_online.sim +./test.sh -f unique/arbitrator/replica_changeWithArbitrator.sim +./test.sh -f unique/arbitrator/sync_replica2_alterTable_add.sim +./test.sh -f unique/arbitrator/sync_replica2_alterTable_drop.sim +./test.sh -f unique/arbitrator/sync_replica2_dropDb.sim +./test.sh -f unique/arbitrator/sync_replica2_dropTable.sim +./test.sh -f unique/arbitrator/sync_replica3_alterTable_add.sim +./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim +./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim +./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim +