diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h
index c913b2cf2a6fb1506d5c6d9b1d483ec063bede01..58e19ce52af1402ab34f073c3ed3262bcab00092 100644
--- a/src/inc/taoserror.h
+++ b/src/inc/taoserror.h
@@ -267,6 +267,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
diff --git a/src/inc/vnode.h b/src/inc/vnode.h
index 95f1d27b591c2503461826e175c3e768c0db77cc..cbe64484b11462fcf3f107625cb0e00b0ac682f4 100644
--- a/src/inc/vnode.h
+++ b/src/inc/vnode.h
@@ -48,7 +48,7 @@ typedef struct {
void * pVnode;
SRpcMsg rpcMsg;
SRspRet rspRet;
- char reserveForSync[16];
+ char reserveForSync[24];
SWalHead pHead[];
} SVWriteMsg;
diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h
index 31ea2da640ef3b20b84ca769a22637332d6ffe34..e4df562d81c7ef91fe1ff62eba6611d76e0a3ff1 100644
--- a/src/mnode/inc/mnodeSdb.h
+++ b/src/mnode/inc/mnodeSdb.h
@@ -59,7 +59,7 @@ typedef struct SSdbRow {
SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
- char reserveForSync[16];
+ char reserveForSync[24];
SWalHead pHead[];
} SSdbRow;
diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h
index c00015552fd9cebf5340052e036fc889d8780e6b..535251ba115edf75d2fdfd1efbcf809bc55bc142 100644
--- a/src/sync/inc/syncInt.h
+++ b/src/sync/inc/syncInt.h
@@ -13,12 +13,14 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_SYNCINT_H
-#define TDENGINE_SYNCINT_H
+#ifndef TDENGINE_SYNC_INT_H
+#define TDENGINE_SYNC_INT_H
#ifdef __cplusplus
extern "C" {
#endif
+#include "syncMsg.h"
+#include "twal.h"
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
@@ -27,88 +29,16 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
-typedef enum {
- TAOS_SMSG_SYNC_DATA = 1,
- TAOS_SMSG_FORWARD = 2,
- TAOS_SMSG_FORWARD_RSP = 3,
- TAOS_SMSG_SYNC_REQ = 4,
- TAOS_SMSG_SYNC_RSP = 5,
- TAOS_SMSG_SYNC_MUST = 6,
- TAOS_SMSG_STATUS = 7,
- TAOS_SMSG_SYNC_DATA_RSP = 8,
-} ESyncMsgType;
-
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
-#define SYNC_PROTOCOL_VERSION 0
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
-#pragma pack(push, 1)
-
-typedef struct {
- int8_t type; // msg type
- int8_t protocol; // protocol version
- int8_t reserved[6]; // not used
- int32_t vgId; // vg ID
- int32_t len; // content length, does not include head
-} SSyncHead;
-
-typedef struct {
- SSyncHead head;
- uint16_t port;
- uint16_t tranId;
- char fqdn[TSDB_FQDN_LEN];
- int32_t sourceId; // only for arbitrator
-} SSyncMsg;
-
-typedef struct {
- SSyncHead head;
- int8_t sync;
- int8_t reserved;
- uint16_t tranId;
-} SSyncRsp;
-
-typedef struct {
- int8_t role;
- uint64_t version;
-} SPeerStatus;
-
-typedef struct {
- int8_t role;
- int8_t ack;
- int8_t type;
- int8_t reserved[3];
- uint16_t tranId;
- uint64_t version;
- SPeerStatus peersStatus[];
-} SPeersStatus;
-
-typedef struct {
- char name[TSDB_FILENAME_LEN];
- uint32_t magic;
- uint32_t index;
- uint64_t fversion;
- int64_t size;
-} SFileInfo;
-
-typedef struct {
- int8_t sync;
-} SFileAck;
-
-typedef struct {
- SSyncHead head;
- uint64_t version;
- int32_t code;
-} SFwdRsp;
-
-#pragma pack(pop)
-
typedef struct {
char * buffer;
int32_t bufferSize;
@@ -192,7 +122,6 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
-uint16_t syncGenTranId();
#ifdef __cplusplus
}
diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h
new file mode 100644
index 0000000000000000000000000000000000000000..9a7ff04de1ec18abc5017d18c7f09f113aba8c41
--- /dev/null
+++ b/src/sync/inc/syncMsg.h
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef TDENGINE_SYNC_MSG_H
+#define TDENGINE_SYNC_MSG_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#include "tsync.h"
+
+typedef enum {
+ TAOS_SMSG_START = 0,
+ TAOS_SMSG_SYNC_DATA = 1,
+ TAOS_SMSG_SYNC_DATA_RSP = 2,
+ TAOS_SMSG_SYNC_FWD = 3,
+ TAOS_SMSG_SYNC_FWD_RSP = 4,
+ TAOS_SMSG_SYNC_REQ = 5,
+ TAOS_SMSG_SYNC_REQ_RSP = 6,
+ TAOS_SMSG_SYNC_MUST = 7,
+ TAOS_SMSG_SYNC_MUST_RSP = 8,
+ TAOS_SMSG_STATUS = 9,
+ TAOS_SMSG_STATUS_RSP = 10,
+ TAOS_SMSG_SETUP = 11,
+ TAOS_SMSG_SETUP_RSP = 12,
+ TAOS_SMSG_END = 13,
+} ESyncMsgType;
+
+typedef enum {
+ SYNC_STATUS_BROADCAST,
+ SYNC_STATUS_BROADCAST_RSP,
+ SYNC_STATUS_SETUP_CONN,
+ SYNC_STATUS_SETUP_CONN_RSP,
+ SYNC_STATUS_EXCHANGE_DATA,
+ SYNC_STATUS_EXCHANGE_DATA_RSP,
+ SYNC_STATUS_CHECK_ROLE,
+ SYNC_STATUS_CHECK_ROLE_RSP
+} ESyncStatusType;
+
+#pragma pack(push, 1)
+
+typedef struct {
+ int8_t type; // msg type
+ int8_t protocol; // protocol version
+ uint16_t signature; // fixed value
+ int32_t code; //
+ int32_t cId; // cluster Id
+ int32_t vgId; // vg ID
+ int32_t len; // content length, does not include head
+ uint32_t cksum;
+} SSyncHead;
+
+typedef struct {
+ SSyncHead head;
+ uint16_t port;
+ uint16_t tranId;
+ int32_t sourceId; // only for arbitrator
+ char fqdn[TSDB_FQDN_LEN];
+} SSyncMsg;
+
+typedef struct {
+ SSyncHead head;
+ int8_t sync;
+ int8_t reserved;
+ uint16_t tranId;
+ int8_t reserverd[4];
+} SSyncRsp;
+
+typedef struct {
+ int8_t role;
+ uint64_t version;
+} SPeerStatus;
+
+typedef struct {
+ SSyncHead head;
+ int8_t role;
+ int8_t ack;
+ int8_t type;
+ int8_t reserved[3];
+ uint16_t tranId;
+ uint64_t version;
+ SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
+} SPeersStatus;
+
+typedef struct {
+ SSyncHead head;
+ char name[TSDB_FILENAME_LEN];
+ uint32_t magic;
+ uint32_t index;
+ uint64_t fversion;
+ int64_t size;
+} SFileInfo;
+
+typedef struct {
+ SSyncHead head;
+ int8_t sync;
+} SFileAck;
+
+typedef struct {
+ SSyncHead head;
+ uint64_t version;
+ int32_t code;
+} SFwdRsp;
+
+#pragma pack(pop)
+
+#define SYNC_PROTOCOL_VERSION 0
+#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
+
+extern char *statusType[];
+
+uint16_t syncGenTranId();
+int32_t syncCheckHead(SSyncHead *pHead);
+
+void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
+void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
+void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // TDENGINE_VNODEPEER_H
diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c
index 5e3355e68f46d6dd7e0a76c0affefeeb55b52248..bf0b6e95d487c16b6a72221cc4ea5ee95c9e0824 100644
--- a/src/sync/src/syncMain.c
+++ b/src/sync/src/syncMain.c
@@ -80,32 +80,6 @@ char *syncStatus[] = {
"invalid"
};
-typedef enum {
- SYNC_STATUS_BROADCAST,
- SYNC_STATUS_BROADCAST_RSP,
- SYNC_STATUS_SETUP_CONN,
- SYNC_STATUS_SETUP_CONN_RSP,
- SYNC_STATUS_EXCHANGE_DATA,
- SYNC_STATUS_EXCHANGE_DATA_RSP,
- SYNC_STATUS_CHECK_ROLE,
- SYNC_STATUS_CHECK_ROLE_RSP
-} ESyncStatusType;
-
-char *statusType[] = {
- "broadcast",
- "broadcast-rsp",
- "setup-conn",
- "setup-conn-rsp",
- "exchange-data",
- "exchange-data-rsp",
- "check-role",
- "check-role-rsp"
-};
-
-uint16_t syncGenTranId() {
- return taosRand() & 0XFFFF;
-}
-
int32_t syncInit() {
SPoolInfo info = {0};
@@ -384,19 +358,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
- SFwdRsp fwdRsp = {0};
-
- fwdRsp.head.type = TAOS_SMSG_FORWARD_RSP;
- fwdRsp.head.protocol = SYNC_PROTOCOL_VERSION;
- fwdRsp.head.vgId = pNode->vgId;
- fwdRsp.head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
- fwdRsp.version = version;
- fwdRsp.code = code;
+ SFwdRsp rsp;
+ syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
- if (taosWriteMsg(pPeer->peerFd, &fwdRsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
- sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
+ if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
+ sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version);
} else {
- sDebug("%s, failed to send forward ack, restart", pPeer->id);
+ sDebug("%s, failed to send forward-rsp, restart", pPeer->id);
syncRestartConnection(pPeer);
}
}
@@ -864,14 +832,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id);
SSyncMsg msg;
- memset(&msg, 0, sizeof(SSyncMsg));
- msg.head.type = TAOS_SMSG_SYNC_REQ;
- msg.head.protocol = SYNC_PROTOCOL_VERSION;
- msg.head.vgId = pNode->vgId;
- msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
- msg.port = tsSyncPort;
- msg.tranId = syncGenTranId();
- tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
+ syncBuildSyncReqMsg(&msg, pNode->vgId);
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
@@ -882,9 +843,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
}
}
-static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
+static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
- SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo;
@@ -914,7 +874,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
- SWalHead * pHead = (SWalHead *)cont;
+ SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead));
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
@@ -931,9 +891,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
}
}
-static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
- SSyncNode * pNode = pPeer->pSyncNode;
- SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
+static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
+ SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
@@ -947,23 +906,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
}
}
-static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
+static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
if (pPeer->peerFd < 0) return -1;
int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) {
- sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen);
+ sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen);
return -1;
}
- // head.len = htonl(head.len);
- if (pHead->len < 0) {
- sError("%s, invalid msg length, hlen:%d", pPeer->id, pHead->len);
+ int32_t code = syncCheckHead(pHead);
+ if (code != 0) {
+ sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type);
return -1;
}
- assert(pHead->len <= TSDB_MAX_WAL_SIZE);
- int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
+ int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len);
if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1;
@@ -974,23 +932,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param;
- SSyncHead head;
- char * cont = buffer;
-
+ SSyncHead *pHead = buffer;
SSyncNode *pNode = pPeer->pSyncNode;
+
pthread_mutex_lock(&pNode->mutex);
- int32_t code = syncReadPeerMsg(pPeer, &head, cont);
+ int32_t code = syncReadPeerMsg(pPeer, pHead);
if (code == 0) {
- if (head.type == TAOS_SMSG_FORWARD) {
- syncProcessForwardFromPeer(cont, pPeer);
- } else if (head.type == TAOS_SMSG_FORWARD_RSP) {
- syncProcessFwdResponse(cont, pPeer);
- } else if (head.type == TAOS_SMSG_SYNC_REQ) {
- syncProcessSyncRequest(cont, pPeer);
- } else if (head.type == TAOS_SMSG_STATUS) {
- syncProcessPeersStatusMsg(cont, pPeer);
+ if (pHead->type == TAOS_SMSG_SYNC_FWD) {
+ syncProcessForwardFromPeer(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) {
+ syncProcessFwdResponse(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_SYNC_REQ) {
+ syncProcessSyncRequest(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_STATUS) {
+ syncProcessPeersStatusMsg(buffer, pPeer);
}
}
@@ -999,36 +956,29 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
return code;
}
-#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
-
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
- SSyncNode *pNode = pPeer->pSyncNode;
- char msg[statusMsgLen] = {0};
-
if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
- SSyncHead * pHead = (SSyncHead *)msg;
- SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
+ SSyncNode *pNode = pPeer->pSyncNode;
+ SPeersStatus msg = {0};
- pHead->type = TAOS_SMSG_STATUS;
- pHead->len = statusMsgLen - sizeof(SSyncHead);
+ syncBuildPeersStatus(&msg, pNode->vgId);
- pPeersStatus->version = nodeVersion;
- pPeersStatus->role = nodeRole;
- pPeersStatus->ack = ack;
- pPeersStatus->type = type;
- pPeersStatus->tranId = tranId;
+ msg.role = nodeRole;
+ msg.ack = ack;
+ msg.type = type;
+ msg.tranId = tranId;
+ msg.version = nodeVersion;
for (int32_t i = 0; i < pNode->replica; ++i) {
- pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
- pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
+ msg.peersStatus[i].role = pNode->peerInfo[i]->role;
+ msg.peersStatus[i].version = pNode->peerInfo[i]->version;
}
- if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) {
+ if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
- syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
- statusType[pPeersStatus->type], pPeer->peerFd);
+ syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd);
} else {
sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer);
@@ -1053,15 +1003,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
}
SSyncMsg msg;
- memset(&msg, 0, sizeof(SSyncMsg));
- msg.head.type = TAOS_SMSG_STATUS;
- msg.head.protocol = SYNC_PROTOCOL_VERSION;
- msg.head.vgId = pPeer->nodeId ? pNode->vgId : 0;
- msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
- msg.port = tsSyncPort;
- msg.tranId = syncGenTranId();
- msg.sourceId = pNode->vgId; // tell arbitrator its vgId
- tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
+ syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0);
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
@@ -1341,10 +1283,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
- pSyncHead->type = TAOS_SMSG_FORWARD;
- pSyncHead->protocol = SYNC_PROTOCOL_VERSION;
- pSyncHead->vgId = pNode->vgId;
- pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
+ syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&pNode->mutex);
diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c
new file mode 100644
index 0000000000000000000000000000000000000000..dafefc77baab736be646651d6e504d83553a3029
--- /dev/null
+++ b/src/sync/src/syncMsg.c
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "taoserror.h"
+#include "tchecksum.h"
+#include "syncInt.h"
+
+char *statusType[] = {
+ "broadcast",
+ "broadcast-rsp",
+ "setup-conn",
+ "setup-conn-rsp",
+ "exchange-data",
+ "exchange-data-rsp",
+ "check-role",
+ "check-role-rsp"
+};
+
+uint16_t syncGenTranId() {
+ return taosRand() & 0XFFFF;
+}
+
+static void syncBuildHead(SSyncHead *pHead) {
+ pHead->protocol = SYNC_PROTOCOL_VERSION;
+ pHead->signature = SYNC_SIGNATURE;
+ pHead->code = 0;
+ pHead->cId = 0;
+ taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
+}
+
+int32_t syncCheckHead(SSyncHead *pHead) {
+ if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL;
+ if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
+ if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID;
+ if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN;
+ if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) {
+ pHead->type = TAOS_SMSG_SYNC_FWD;
+ pHead->vgId = vgId;
+ pHead->len = len;
+ syncBuildHead(pHead);
+}
+
+void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) {
+ pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+
+ pMsg->version = version;
+ pMsg->code = code;
+}
+
+static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
+ pMsg->head.type = type;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+
+ pMsg->port = tsSyncPort;
+ pMsg->tranId = syncGenTranId();
+ pMsg->sourceId = vgId;
+ tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
+}
+
+void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
+void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
+void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
+
+void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
+ pMsg->head.type = TAOS_SMSG_STATUS;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+}
diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c
index 981716d5615d9375a3a741f1f09195abc8dcbe0a..204f4f90367c2038e40b41d7ab206c579d31b3d6 100644
--- a/src/sync/src/syncRetrieve.c
+++ b/src/sync/src/syncRetrieve.c
@@ -406,14 +406,7 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SSyncMsg msg;
- memset(&msg, 0, sizeof(SSyncMsg));
- msg.head.type = TAOS_SMSG_SYNC_DATA;
- msg.head.protocol = SYNC_PROTOCOL_VERSION;
- msg.head.vgId = pNode->vgId;
- msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
- msg.port = tsSyncPort;
- msg.tranId = syncGenTranId();
- tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
+ syncBuildSyncDataMsg(&msg, pNode->vgId);
if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);