提交 43d154a6 编写于 作者: S Shengliang Guan

TD-2861

上级 65cc4d90
...@@ -37,7 +37,8 @@ typedef enum { ...@@ -37,7 +37,8 @@ typedef enum {
TAOS_SMSG_SETUP_RSP = 12, TAOS_SMSG_SETUP_RSP = 12,
TAOS_SMSG_SYNC_FILE = 13, TAOS_SMSG_SYNC_FILE = 13,
TAOS_SMSG_SYNC_FILE_RSP = 14, TAOS_SMSG_SYNC_FILE_RSP = 14,
TAOS_SMSG_END = 15, TAOS_SMSG_TEST = 15,
TAOS_SMSG_END = 16
} ESyncMsgType; } ESyncMsgType;
typedef enum { typedef enum {
...@@ -132,6 +133,7 @@ void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId); ...@@ -132,6 +133,7 @@ void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
......
...@@ -1179,6 +1179,20 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { ...@@ -1179,6 +1179,20 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
} }
} }
static void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd) {
sInfo("recv sync test msg");
SSyncMsg rsp;
syncBuildSyncTestMsg(&rsp, -1);
if (taosWriteMsg(connFd, &rsp, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sInfo("failed to send sync test rsp since %s", strerror(errno));
}
sInfo("send sync test rsp");
taosMsleep(1000);
taosCloseSocket(connFd);
}
static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
int32_t i; int32_t i;
...@@ -1200,6 +1214,11 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1200,6 +1214,11 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
return; return;
} }
if (msg.head.type == TAOS_SMSG_TEST) {
syncProcessTestMsg(&msg, connFd);
return;
}
int32_t vgId = msg.head.vgId; int32_t vgId = msg.head.vgId;
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
......
...@@ -86,6 +86,7 @@ static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) { ...@@ -86,6 +86,7 @@ static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); } void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); } void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); } void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_TEST); }
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_STATUS; pMsg->head.type = TAOS_SMSG_STATUS;
......
...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5) ...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC}) ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z) TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "tsocket.h" #include "tsocket.h"
#include "trpc.h" #include "trpc.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "tchecksum.h"
#include "syncMsg.h"
#define MAX_PKG_LEN (64 * 1000) #define MAX_PKG_LEN (64 * 1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024) #define BUFFER_SIZE (MAX_PKG_LEN + 1024)
...@@ -408,13 +410,51 @@ static void taosNetTestStartup(char *host, int32_t port) { ...@@ -408,13 +410,51 @@ static void taosNetTestStartup(char *host, int32_t port) {
free(pStep); free(pStep);
} }
static void taosNetCheckSync(char *host, int32_t port) {
uint32_t ip = taosGetIpv4FromFqdn(host);
if (ip == 0xffffffff) {
uError("failed to get IP address from %s since %s", host, strerror(errno));
return;
}
SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
if (connFd < 0) {
uError("failed to create socket while test port:%d since %s", port, strerror(errno));
return;
}
SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg));
SSyncHead *pHead = &msg.head;
pHead->type = TAOS_SMSG_TEST;
pHead->protocol = SYNC_PROTOCOL_VERSION;
pHead->signature = SYNC_SIGNATURE;
pHead->code = 0;
pHead->cId = 0;
pHead->vgId = -1;
pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
uError("failed to test port:%d while send msg since %s", port, strerror(errno));
return;
}
if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
}
uInfo("successed to test TCP port:%d", port);
taosCloseSocket(connFd);
}
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 9; int32_t endPort = startPort + TSDB_PORT_SYNC;
char spi = 0; char spi = 0;
uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen); uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
for (uint16_t port = startPort; port <= endPort; port++) { for (uint16_t port = startPort; port < endPort; port++) {
int32_t sendpkgLen; int32_t sendpkgLen;
if (pkgLen <= tsRpcMaxUdpSize) { if (pkgLen <= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize + 1000; sendpkgLen = tsRpcMaxUdpSize + 1000;
...@@ -442,6 +482,8 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { ...@@ -442,6 +482,8 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
uInfo("successed to test UDP port:%d", port); uInfo("successed to test UDP port:%d", port);
} }
} }
taosNetCheckSync(host, endPort);
} }
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) { static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册