From 43d154a6903ddc10591d56304c5fec457377d6ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 27 Jan 2021 11:30:15 +0000 Subject: [PATCH] TD-2861 --- src/sync/inc/syncMsg.h | 4 +++- src/sync/src/syncMain.c | 19 ++++++++++++++++ src/sync/src/syncMsg.c | 1 + src/util/CMakeLists.txt | 1 + src/util/src/tnettest.c | 48 ++++++++++++++++++++++++++++++++++++++--- 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h index 73f4223c88..f589379aa2 100644 --- a/src/sync/inc/syncMsg.h +++ b/src/sync/inc/syncMsg.h @@ -37,7 +37,8 @@ typedef enum { TAOS_SMSG_SETUP_RSP = 12, TAOS_SMSG_SYNC_FILE = 13, TAOS_SMSG_SYNC_FILE_RSP = 14, - TAOS_SMSG_END = 15, + TAOS_SMSG_TEST = 15, + TAOS_SMSG_END = 16 } ESyncMsgType; typedef enum { @@ -132,6 +133,7 @@ void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); +void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index d698432176..6e87039612 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1179,6 +1179,20 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { } } +static void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd) { + sInfo("recv sync test msg"); + + SSyncMsg rsp; + syncBuildSyncTestMsg(&rsp, -1); + if (taosWriteMsg(connFd, &rsp, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sInfo("failed to send sync test rsp since %s", strerror(errno)); + } + + sInfo("send sync test rsp"); + taosMsleep(1000); + taosCloseSocket(connFd); +} + static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; int32_t i; @@ -1200,6 +1214,11 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { return; } + if (msg.head.type == TAOS_SMSG_TEST) { + syncProcessTestMsg(&msg, connFd); + return; + } + int32_t vgId = msg.head.vgId; SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c index 034f9a98a7..9718a3414e 100644 --- a/src/sync/src/syncMsg.c +++ b/src/sync/src/syncMsg.c @@ -86,6 +86,7 @@ static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) { void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); } void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); } void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); } +void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_TEST); } void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { pMsg->head.type = TAOS_SMSG_STATUS; diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 3606aea76b..92e030ad81 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5) PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index fe6dfb493d..7e39715cd9 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -23,6 +23,8 @@ #include "tsocket.h" #include "trpc.h" #include "rpcHead.h" +#include "tchecksum.h" +#include "syncMsg.h" #define MAX_PKG_LEN (64 * 1000) #define BUFFER_SIZE (MAX_PKG_LEN + 1024) @@ -408,13 +410,51 @@ static void taosNetTestStartup(char *host, int32_t port) { free(pStep); } +static void taosNetCheckSync(char *host, int32_t port) { + uint32_t ip = taosGetIpv4FromFqdn(host); + if (ip == 0xffffffff) { + uError("failed to get IP address from %s since %s", host, strerror(errno)); + return; + } + + SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0); + if (connFd < 0) { + uError("failed to create socket while test port:%d since %s", port, strerror(errno)); + return; + } + + SSyncMsg msg; + memset(&msg, 0, sizeof(SSyncMsg)); + SSyncHead *pHead = &msg.head; + pHead->type = TAOS_SMSG_TEST; + pHead->protocol = SYNC_PROTOCOL_VERSION; + pHead->signature = SYNC_SIGNATURE; + pHead->code = 0; + pHead->cId = 0; + pHead->vgId = -1; + pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead); + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead)); + + if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + uError("failed to test port:%d while send msg since %s", port, strerror(errno)); + return; + } + + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + uError("failed to test port:%d while recv msg since %s", port, strerror(errno)); + } + + uInfo("successed to test TCP port:%d", port); + taosCloseSocket(connFd); +} + static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { - int32_t endPort = startPort + 9; + int32_t endPort = startPort + TSDB_PORT_SYNC; char spi = 0; uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); - - for (uint16_t port = startPort; port <= endPort; port++) { + + for (uint16_t port = startPort; port < endPort; port++) { int32_t sendpkgLen; if (pkgLen <= tsRpcMaxUdpSize) { sendpkgLen = tsRpcMaxUdpSize + 1000; @@ -442,6 +482,8 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { uInfo("successed to test UDP port:%d", port); } } + + taosNetCheckSync(host, endPort); } static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) { -- GitLab