diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h index 73f4223c882ff9ce544984b37843070cf579c44a..f589379aa2cc0688a3f057319d660e8c58c6bdeb 100644 --- a/src/sync/inc/syncMsg.h +++ b/src/sync/inc/syncMsg.h @@ -37,7 +37,8 @@ typedef enum { TAOS_SMSG_SETUP_RSP = 12, TAOS_SMSG_SYNC_FILE = 13, TAOS_SMSG_SYNC_FILE_RSP = 14, - TAOS_SMSG_END = 15, + TAOS_SMSG_TEST = 15, + TAOS_SMSG_END = 16 } ESyncMsgType; typedef enum { @@ -132,6 +133,7 @@ void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); +void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index d69843217691b5e1dbab8c37c2f49fe9760b9491..6e8703961233c44eb6c94b6ad71fb5b62e1ec812 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1179,6 +1179,20 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { } } +static void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd) { + sInfo("recv sync test msg"); + + SSyncMsg rsp; + syncBuildSyncTestMsg(&rsp, -1); + if (taosWriteMsg(connFd, &rsp, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + sInfo("failed to send sync test rsp since %s", strerror(errno)); + } + + sInfo("send sync test rsp"); + taosMsleep(1000); + taosCloseSocket(connFd); +} + static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; int32_t i; @@ -1200,6 +1214,11 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { return; } + if (msg.head.type == TAOS_SMSG_TEST) { + syncProcessTestMsg(&msg, connFd); + return; + } + int32_t vgId = msg.head.vgId; SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c index 034f9a98a70c7373b74a84d24d478b52d7bf4df9..9718a3414e3bad6d303e4c844c7120582237c83e 100644 --- a/src/sync/src/syncMsg.c +++ b/src/sync/src/syncMsg.c @@ -86,6 +86,7 @@ static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) { void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); } void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); } void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); } +void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_TEST); } void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { pMsg->head.type = TAOS_SMSG_STATUS; diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 3606aea76b91c8bc272cf658065443f65fd3fd28..92e030ad81760ed6ede1b7e53128b731b382097d 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5) PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index fe6dfb493d3b8293ad6cbdcf42b0122f991e36a2..7e39715cd954d5aa31f7dccb65f8ef5c3a9c5e57 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -23,6 +23,8 @@ #include "tsocket.h" #include "trpc.h" #include "rpcHead.h" +#include "tchecksum.h" +#include "syncMsg.h" #define MAX_PKG_LEN (64 * 1000) #define BUFFER_SIZE (MAX_PKG_LEN + 1024) @@ -408,13 +410,51 @@ static void taosNetTestStartup(char *host, int32_t port) { free(pStep); } +static void taosNetCheckSync(char *host, int32_t port) { + uint32_t ip = taosGetIpv4FromFqdn(host); + if (ip == 0xffffffff) { + uError("failed to get IP address from %s since %s", host, strerror(errno)); + return; + } + + SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0); + if (connFd < 0) { + uError("failed to create socket while test port:%d since %s", port, strerror(errno)); + return; + } + + SSyncMsg msg; + memset(&msg, 0, sizeof(SSyncMsg)); + SSyncHead *pHead = &msg.head; + pHead->type = TAOS_SMSG_TEST; + pHead->protocol = SYNC_PROTOCOL_VERSION; + pHead->signature = SYNC_SIGNATURE; + pHead->code = 0; + pHead->cId = 0; + pHead->vgId = -1; + pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead); + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead)); + + if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + uError("failed to test port:%d while send msg since %s", port, strerror(errno)); + return; + } + + if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + uError("failed to test port:%d while recv msg since %s", port, strerror(errno)); + } + + uInfo("successed to test TCP port:%d", port); + taosCloseSocket(connFd); +} + static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { - int32_t endPort = startPort + 9; + int32_t endPort = startPort + TSDB_PORT_SYNC; char spi = 0; uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); - - for (uint16_t port = startPort; port <= endPort; port++) { + + for (uint16_t port = startPort; port < endPort; port++) { int32_t sendpkgLen; if (pkgLen <= tsRpcMaxUdpSize) { sendpkgLen = tsRpcMaxUdpSize + 1000; @@ -442,6 +482,8 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { uInfo("successed to test UDP port:%d", port); } } + + taosNetCheckSync(host, endPort); } static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {