diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h
index c913b2cf2a6fb1506d5c6d9b1d483ec063bede01..7c7e7ec31ab88a1f048767b9cc4aa486bfa788cd 100644
--- a/src/inc/taoserror.h
+++ b/src/inc/taoserror.h
@@ -267,6 +267,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length")
+TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, 0, 0x090A, "Invalid msg type")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
diff --git a/src/inc/tsync.h b/src/inc/tsync.h
index 0ce2a1a495b7826635d07ac9139cb77b306c9330..4dae86bbed538a0801251f62723a471215655e24 100644
--- a/src/inc/tsync.h
+++ b/src/inc/tsync.h
@@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[];
//global configurable parameters
-extern int32_t tsMaxSyncNum;
-extern int32_t tsSyncTcpThreads;
-extern int32_t tsSyncTimer;
-extern int32_t tsMaxFwdInfo;
extern int32_t sDebugFlag;
extern char tsArbitrator[];
extern uint16_t tsSyncPort;
diff --git a/src/inc/vnode.h b/src/inc/vnode.h
index 95f1d27b591c2503461826e175c3e768c0db77cc..cbe64484b11462fcf3f107625cb0e00b0ac682f4 100644
--- a/src/inc/vnode.h
+++ b/src/inc/vnode.h
@@ -48,7 +48,7 @@ typedef struct {
void * pVnode;
SRpcMsg rpcMsg;
SRspRet rspRet;
- char reserveForSync[16];
+ char reserveForSync[24];
SWalHead pHead[];
} SVWriteMsg;
diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h
index 31ea2da640ef3b20b84ca769a22637332d6ffe34..e4df562d81c7ef91fe1ff62eba6611d76e0a3ff1 100644
--- a/src/mnode/inc/mnodeSdb.h
+++ b/src/mnode/inc/mnodeSdb.h
@@ -59,7 +59,7 @@ typedef struct SSdbRow {
SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
- char reserveForSync[16];
+ char reserveForSync[24];
SWalHead pHead[];
} SSdbRow;
diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt
index 60271c771ca0a01bd449cb878fe2269759250fd3..aa38a56f38146bafda8a10ca786bf085bb4ea339 100644
--- a/src/sync/CMakeLists.txt
+++ b/src/sync/CMakeLists.txt
@@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
- LIST(REMOVE_ITEM SRC src/tarbitrator.c)
+ LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common)
- LIST(APPEND BIN_SRC src/tarbitrator.c)
- LIST(APPEND BIN_SRC src/taosTcpPool.c)
+ LIST(APPEND BIN_SRC src/syncArbitrator.c)
+ LIST(APPEND BIN_SRC src/syncTcp.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h
index 2be25447c4a6b45863718ef2b6c402961fecb93f..d855c651f93a58d46e064e67d287f135684f4954 100644
--- a/src/sync/inc/syncInt.h
+++ b/src/sync/inc/syncInt.h
@@ -13,12 +13,14 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_SYNCINT_H
-#define TDENGINE_SYNCINT_H
+#ifndef TDENGINE_SYNC_INT_H
+#define TDENGINE_SYNC_INT_H
#ifdef __cplusplus
extern "C" {
#endif
+#include "syncMsg.h"
+#include "twal.h"
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
@@ -27,86 +29,22 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
-typedef enum {
- TAOS_SMSG_SYNC_DATA = 1,
- TAOS_SMSG_FORWARD = 2,
- TAOS_SMSG_FORWARD_RSP = 3,
- TAOS_SMSG_SYNC_REQ = 4,
- TAOS_SMSG_SYNC_RSP = 5,
- TAOS_SMSG_SYNC_MUST = 6,
- TAOS_SMSG_STATUS = 7,
- TAOS_SMSG_SYNC_DATA_RSP = 8,
-} ESyncMsgType;
+#define SYNC_TCP_THREADS 2
+#define SYNC_MAX_NUM 2
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
-#define SYNC_FWD_TIMER 300
-#define SYNC_ROLE_TIMER 10000
-#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
+
+#define SYNC_MAX_FWDS 512
+#define SYNC_FWD_TIMER 300
+#define SYNC_ROLE_TIMER 15000 // ms
+#define SYNC_CHECK_INTERVAL 1 // ms
+#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
-#pragma pack(push, 1)
-
-typedef struct {
- char type; // msg type
- char pversion; // protocol version
- char reserved[6]; // not used
- int32_t vgId; // vg ID
- int32_t len; // content length, does not include head
- // char cont[]; // message content starts from here
-} SSyncHead;
-
-typedef struct {
- SSyncHead syncHead;
- uint16_t port;
- uint16_t tranId;
- char fqdn[TSDB_FQDN_LEN];
- int32_t sourceId; // only for arbitrator
-} SFirstPkt;
-
-typedef struct {
- int8_t sync;
- int8_t reserved;
- uint16_t tranId;
-} SFirstPktRsp;
-
-typedef struct {
- int8_t role;
- uint64_t version;
-} SPeerStatus;
-
-typedef struct {
- int8_t role;
- int8_t ack;
- int8_t type;
- int8_t reserved[3];
- uint16_t tranId;
- uint64_t version;
- SPeerStatus peersStatus[];
-} SPeersStatus;
-
-typedef struct {
- char name[TSDB_FILENAME_LEN];
- uint32_t magic;
- uint32_t index;
- uint64_t fversion;
- int64_t size;
-} SFileInfo;
-
-typedef struct {
- int8_t sync;
-} SFileAck;
-
-typedef struct {
- uint64_t version;
- int32_t code;
-} SFwdRsp;
-
-#pragma pack(pop)
-
typedef struct {
char * buffer;
int32_t bufferSize;
@@ -190,7 +128,6 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
-uint16_t syncGenTranId();
#ifdef __cplusplus
}
diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h
new file mode 100644
index 0000000000000000000000000000000000000000..73f4223c882ff9ce544984b37843070cf579c44a
--- /dev/null
+++ b/src/sync/inc/syncMsg.h
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef TDENGINE_SYNC_MSG_H
+#define TDENGINE_SYNC_MSG_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#include "tsync.h"
+
+typedef enum {
+ TAOS_SMSG_START = 0,
+ TAOS_SMSG_SYNC_DATA = 1,
+ TAOS_SMSG_SYNC_DATA_RSP = 2,
+ TAOS_SMSG_SYNC_FWD = 3,
+ TAOS_SMSG_SYNC_FWD_RSP = 4,
+ TAOS_SMSG_SYNC_REQ = 5,
+ TAOS_SMSG_SYNC_REQ_RSP = 6,
+ TAOS_SMSG_SYNC_MUST = 7,
+ TAOS_SMSG_SYNC_MUST_RSP = 8,
+ TAOS_SMSG_STATUS = 9,
+ TAOS_SMSG_STATUS_RSP = 10,
+ TAOS_SMSG_SETUP = 11,
+ TAOS_SMSG_SETUP_RSP = 12,
+ TAOS_SMSG_SYNC_FILE = 13,
+ TAOS_SMSG_SYNC_FILE_RSP = 14,
+ TAOS_SMSG_END = 15,
+} ESyncMsgType;
+
+typedef enum {
+ SYNC_STATUS_BROADCAST,
+ SYNC_STATUS_BROADCAST_RSP,
+ SYNC_STATUS_SETUP_CONN,
+ SYNC_STATUS_SETUP_CONN_RSP,
+ SYNC_STATUS_EXCHANGE_DATA,
+ SYNC_STATUS_EXCHANGE_DATA_RSP,
+ SYNC_STATUS_CHECK_ROLE,
+ SYNC_STATUS_CHECK_ROLE_RSP
+} ESyncStatusType;
+
+#pragma pack(push, 1)
+
+typedef struct {
+ int8_t type; // msg type
+ int8_t protocol; // protocol version
+ uint16_t signature; // fixed value
+ int32_t code; //
+ int32_t cId; // cluster Id
+ int32_t vgId; // vg ID
+ int32_t len; // content length, does not include head
+ uint32_t cksum;
+} SSyncHead;
+
+typedef struct {
+ SSyncHead head;
+ uint16_t port;
+ uint16_t tranId;
+ int32_t sourceId; // only for arbitrator
+ char fqdn[TSDB_FQDN_LEN];
+} SSyncMsg;
+
+typedef struct {
+ SSyncHead head;
+ int8_t sync;
+ int8_t reserved;
+ uint16_t tranId;
+ int8_t reserverd[4];
+} SSyncRsp;
+
+typedef struct {
+ int8_t role;
+ uint64_t version;
+} SPeerStatus;
+
+typedef struct {
+ SSyncHead head;
+ int8_t role;
+ int8_t ack;
+ int8_t type;
+ int8_t reserved[3];
+ uint16_t tranId;
+ uint64_t version;
+ SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
+} SPeersStatus;
+
+typedef struct {
+ SSyncHead head;
+ char name[TSDB_FILENAME_LEN];
+ uint32_t magic;
+ uint32_t index;
+ uint64_t fversion;
+ int64_t size;
+} SFileInfo;
+
+typedef struct {
+ SSyncHead head;
+ int8_t sync;
+} SFileAck;
+
+typedef struct {
+ SSyncHead head;
+ uint64_t version;
+ int32_t code;
+} SFwdRsp;
+
+#pragma pack(pop)
+
+#define SYNC_PROTOCOL_VERSION 1
+#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
+
+extern char *statusType[];
+
+uint16_t syncGenTranId();
+int32_t syncCheckHead(SSyncHead *pHead);
+
+void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
+void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
+void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
+void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
+
+void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
+void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // TDENGINE_VNODEPEER_H
diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/syncTcp.h
similarity index 77%
rename from src/sync/inc/taosTcpPool.h
rename to src/sync/inc/syncTcp.h
index 41043b0cd4c886616d5cecd2739eae684052c395..7db51f2a7115ccf23ee42d17cbe62f0798adf260 100644
--- a/src/sync/inc/taosTcpPool.h
+++ b/src/sync/inc/syncTcp.h
@@ -13,16 +13,13 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_TCP_POOL_H
-#define TDENGINE_TCP_POOL_H
+#ifndef TDENGINE_SYNC_TCP_POOL_H
+#define TDENGINE_SYNC_TCP_POOL_H
#ifdef __cplusplus
extern "C" {
#endif
-typedef void *ttpool_h;
-typedef void *tthread_h;
-
typedef struct {
int32_t numOfThreads;
uint32_t serverIp;
@@ -33,10 +30,10 @@ typedef struct {
void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo;
-ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo);
-void taosCloseTcpThreadPool(ttpool_h);
-void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd);
-void taosFreeTcpConn(void *);
+void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
+void syncCloseTcpThreadPool(void *);
+void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd);
+void syncFreeTcpConn(void *);
#ifdef __cplusplus
}
diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/syncArbitrator.c
similarity index 82%
rename from src/sync/src/tarbitrator.c
rename to src/sync/src/syncArbitrator.c
index 4016042de2135f732dfeb2bbc3b0fdc65b1b63f6..1cb2b8f30269c572bc76ddf9c04491d9b77bb3bc 100644
--- a/src/sync/src/tarbitrator.c
+++ b/src/sync/src/syncArbitrator.c
@@ -22,17 +22,17 @@
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
-#include "taosTcpPool.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
+#include "syncTcp.h"
-static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
-static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
-static void arbProcessBrokenLink(void *param);
-static int32_t arbProcessPeerMsg(void *param, void *buffer);
-static tsem_t tsArbSem;
-static ttpool_h tsArbTcpPool;
+static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
+static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
+static void arbProcessBrokenLink(void *param);
+static int32_t arbProcessPeerMsg(void *param, void *buffer);
+static tsem_t tsArbSem;
+static void * tsArbTcpPool;
typedef struct {
char id[TSDB_EP_LEN + 24];
@@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection;
- tsArbTcpPool = taosOpenTcpThreadPool(&info);
+ tsArbTcpPool = syncOpenTcpThreadPool(&info);
if (tsArbTcpPool == NULL) {
sDebug("failed to open TCP thread pool, exit...");
@@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) {
tsem_wait(&tsArbSem);
- taosCloseTcpThreadPool(tsArbTcpPool);
- sInfo("TAOS arbitrator is shut down\n");
+ syncCloseTcpThreadPool(tsArbTcpPool);
+ sInfo("TAOS arbitrator is shut down");
closelog();
return 0;
@@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
- SFirstPkt firstPkt;
- if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
- sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
+ SSyncMsg msg;
+ if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
+ sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd);
return;
}
@@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return;
}
- firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
- snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
- if (firstPkt.syncHead.vgId) {
+ msg.fqdn[TSDB_FQDN_LEN - 1] = 0;
+ snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port);
+ if (msg.head.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode);
taosCloseSocket(connFd);
@@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd;
- pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd);
+ pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd);
return;
}
@@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) {
int32_t bytes = 0;
char * cont = (char *)buffer;
- int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
- if (hlen != sizeof(head)) {
+ int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead));
+ if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1;
}
diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c
index aae5dab3cdd5d394307dbbc8c5c4b0040a05d4a7..ac79b486061c191506e527e031e93069759abbb0 100644
--- a/src/sync/src/syncMain.c
+++ b/src/sync/src/syncMain.c
@@ -23,26 +23,19 @@
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
-#include "taosTcpPool.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
+#include "syncTcp.h"
#include "syncInt.h"
-// global configurable
-int32_t tsMaxSyncNum = 2;
-int32_t tsSyncTcpThreads = 2;
-int32_t tsMaxFwdInfo = 512;
-int32_t tsSyncTimer = 1;
+int32_t tsSyncNum = 0; // number of sync in process in whole system
+char tsNodeFqdn[TSDB_FQDN_LEN] = {0};
-// module global, not configurable
-int32_t tsSyncNum; // number of sync in process in whole system
-char tsNodeFqdn[TSDB_FQDN_LEN];
-
-static ttpool_h tsTcpPool;
-static void * tsSyncTmrCtrl = NULL;
-static void * tsVgIdHash;
-static int32_t tsSyncRefId = -1;
+static void * tsTcpPool = NULL;
+static void * tsSyncTmrCtrl = NULL;
+static void * tsVgIdHash = NULL;
+static int32_t tsSyncRefId = -1;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
@@ -80,36 +73,10 @@ char *syncStatus[] = {
"invalid"
};
-typedef enum {
- SYNC_STATUS_BROADCAST,
- SYNC_STATUS_BROADCAST_RSP,
- SYNC_STATUS_SETUP_CONN,
- SYNC_STATUS_SETUP_CONN_RSP,
- SYNC_STATUS_EXCHANGE_DATA,
- SYNC_STATUS_EXCHANGE_DATA_RSP,
- SYNC_STATUS_CHECK_ROLE,
- SYNC_STATUS_CHECK_ROLE_RSP
-} ESyncStatusType;
-
-char *statusType[] = {
- "broadcast",
- "broadcast-rsp",
- "setup-conn",
- "setup-conn-rsp",
- "exchange-data",
- "exchange-data-rsp",
- "check-role",
- "check-role-rsp"
-};
-
-uint16_t syncGenTranId() {
- return taosRand() & 0XFFFF;
-}
-
int32_t syncInit() {
SPoolInfo info = {0};
- info.numOfThreads = tsSyncTcpThreads;
+ info.numOfThreads = SYNC_TCP_THREADS;
info.serverIp = 0;
info.port = tsSyncPort;
info.bufferSize = SYNC_MAX_SIZE;
@@ -117,7 +84,7 @@ int32_t syncInit() {
info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection;
- tsTcpPool = taosOpenTcpThreadPool(&info);
+ tsTcpPool = syncOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) {
sError("failed to init tcpPool");
return -1;
@@ -126,16 +93,16 @@ int32_t syncInit() {
tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl");
- taosCloseTcpThreadPool(tsTcpPool);
+ syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
return -1;
}
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) {
- sError("failed to init tsVgIdHash");
+ sError("failed to init vgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl);
- taosCloseTcpThreadPool(tsTcpPool);
+ syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
tsSyncTmrCtrl = NULL;
return -1;
@@ -155,7 +122,7 @@ int32_t syncInit() {
void syncCleanUp() {
if (tsTcpPool) {
- taosCloseTcpThreadPool(tsTcpPool);
+ syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
}
@@ -236,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]);
- pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1);
+ pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -336,6 +303,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
newPeers[i] = pNode->peerInfo[j];
}
+ if (newPeers[i] == NULL) {
+ sError("vgId:%d, failed to reconfig", pNode->vgId);
+ return TSDB_CODE_SYN_INVALID_CONFIG;
+ }
+
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
}
@@ -384,21 +356,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
- char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
-
- SSyncHead *pHead = (SSyncHead *)msg;
- pHead->type = TAOS_SMSG_FORWARD_RSP;
- pHead->len = sizeof(SFwdRsp);
-
- SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
- pFwdRsp->version = version;
- pFwdRsp->code = code;
+ SFwdRsp rsp;
+ syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
- int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
- if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
- sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
+ if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
+ sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version);
} else {
- sDebug("%s, failed to send forward ack, restart", pPeer->id);
+ sDebug("%s, failed to send forward-rsp, restart", pPeer->id);
syncRestartConnection(pPeer);
}
}
@@ -509,7 +473,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1;
- taosFreeTcpConn(pPeer->pConn);
+ syncFreeTcpConn(pPeer->pConn);
}
}
@@ -779,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id);
- taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
}
}
@@ -857,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer);
// Ensure the sync of mnode not interrupted
- if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) {
+ if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return;
@@ -865,56 +829,42 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id);
- SFirstPkt firstPkt;
- memset(&firstPkt, 0, sizeof(firstPkt));
- firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
- firstPkt.syncHead.vgId = pNode->vgId;
- firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
- firstPkt.tranId = syncGenTranId();
- tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
- firstPkt.port = tsSyncPort;
- taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
-
- if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
+ SSyncMsg msg;
+ syncBuildSyncReqMsg(&msg, pNode->vgId);
+
+ taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
+
+ if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-req to peer", pPeer->id);
} else {
- sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]);
+ sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]);
}
}
-static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
+static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
- SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo;
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
- bool found = false;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
- pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo;
+ pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS;
if (pFwdRsp->version == pFwdInfo->version) {
- found = true;
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode);
- break;
+ return;
}
}
}
-
- if (!found) {
- sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first,
- pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version);
- syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
- }
}
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
- SWalHead * pHead = (SWalHead *)cont;
+ SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead));
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
@@ -931,9 +881,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
}
}
-static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
- SSyncNode * pNode = pPeer->pSyncNode;
- SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
+static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
+ SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
@@ -947,23 +896,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
}
}
-static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
+static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
if (pPeer->peerFd < 0) return -1;
int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) {
- sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen);
+ sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen);
return -1;
}
- // head.len = htonl(head.len);
- if (pHead->len < 0) {
- sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
+ int32_t code = syncCheckHead(pHead);
+ if (code != 0) {
+ sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type);
return -1;
}
- assert(pHead->len <= TSDB_MAX_WAL_SIZE);
- int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
+ int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len);
if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1;
@@ -974,23 +922,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param;
- SSyncHead head;
- char * cont = buffer;
-
+ SSyncHead *pHead = buffer;
SSyncNode *pNode = pPeer->pSyncNode;
+
pthread_mutex_lock(&pNode->mutex);
- int32_t code = syncReadPeerMsg(pPeer, &head, cont);
+ int32_t code = syncReadPeerMsg(pPeer, pHead);
if (code == 0) {
- if (head.type == TAOS_SMSG_FORWARD) {
- syncProcessForwardFromPeer(cont, pPeer);
- } else if (head.type == TAOS_SMSG_FORWARD_RSP) {
- syncProcessFwdResponse(cont, pPeer);
- } else if (head.type == TAOS_SMSG_SYNC_REQ) {
- syncProcessSyncRequest(cont, pPeer);
- } else if (head.type == TAOS_SMSG_STATUS) {
- syncProcessPeersStatusMsg(cont, pPeer);
+ if (pHead->type == TAOS_SMSG_SYNC_FWD) {
+ syncProcessForwardFromPeer(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) {
+ syncProcessFwdResponse(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_SYNC_REQ) {
+ syncProcessSyncRequest(buffer, pPeer);
+ } else if (pHead->type == TAOS_SMSG_STATUS) {
+ syncProcessPeersStatusMsg(buffer, pPeer);
}
}
@@ -999,36 +946,30 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
return code;
}
-#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
-
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
- SSyncNode *pNode = pPeer->pSyncNode;
- char msg[statusMsgLen] = {0};
-
if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
- SSyncHead * pHead = (SSyncHead *)msg;
- SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead));
+ SSyncNode * pNode = pPeer->pSyncNode;
+ SPeersStatus msg;
- pHead->type = TAOS_SMSG_STATUS;
- pHead->len = statusMsgLen - sizeof(SSyncHead);
+ memset(&msg, 0, sizeof(SPeersStatus));
+ syncBuildPeersStatus(&msg, pNode->vgId);
- pPeersStatus->version = nodeVersion;
- pPeersStatus->role = nodeRole;
- pPeersStatus->ack = ack;
- pPeersStatus->type = type;
- pPeersStatus->tranId = tranId;
+ msg.role = nodeRole;
+ msg.ack = ack;
+ msg.type = type;
+ msg.tranId = tranId;
+ msg.version = nodeVersion;
for (int32_t i = 0; i < pNode->replica; ++i) {
- pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role;
- pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
+ msg.peersStatus[i].role = pNode->peerInfo[i]->role;
+ msg.peersStatus[i].version = pNode->peerInfo[i]->version;
}
- if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) {
+ if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
- syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
- statusType[pPeersStatus->type], pPeer->peerFd);
+ syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd);
} else {
sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer);
@@ -1048,29 +989,23 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
- taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return;
}
- SFirstPkt firstPkt;
- memset(&firstPkt, 0, sizeof(firstPkt));
- firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
- firstPkt.syncHead.type = TAOS_SMSG_STATUS;
- tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
- firstPkt.port = tsSyncPort;
- firstPkt.tranId = syncGenTranId();
- firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
+ SSyncMsg msg;
+ syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0);
- if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
- sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId);
+ if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
+ sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
- pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
+ pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
} else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd);
- taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
+ taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
}
}
@@ -1116,14 +1051,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
- SFirstPkt firstPkt;
- if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
- sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
+ SSyncMsg msg;
+ if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
+ sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
+ taosCloseSocket(connFd);
+ return;
+ }
+
+ int32_t code = syncCheckHead((SSyncHead *)(&msg));
+ if (code != 0) {
+ sError("failed to check peer sync msg from ip:%s since %s", ipstr, strerror(code));
taosCloseSocket(connFd);
return;
}
- int32_t vgId = firstPkt.syncHead.vgId;
+ int32_t vgId = msg.head.vgId;
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId);
@@ -1131,7 +1073,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return;
}
- sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId);
+ sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex);
@@ -1139,27 +1081,27 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
- if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
+ if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break;
}
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
if (pPeer == NULL) {
- sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port);
+ sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port);
taosCloseSocket(connFd);
// syncSendVpeerCfgMsg(sync);
} else {
// first packet tells what kind of link
- if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) {
+ if (msg.head.type == TAOS_SMSG_SYNC_DATA) {
pPeer->syncFd = connFd;
nodeSStatus = TAOS_SYNC_STATUS_START;
- sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId,
+ sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId,
syncStatus[nodeSStatus]);
syncCreateRestoreDataThread(pPeer);
} else {
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
syncClosePeerConn(pPeer);
pPeer->peerFd = connFd;
- pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
+ pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
@@ -1192,15 +1134,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int64_t time = taosGetTimestampMs();
- if (pSyncFwds->fwds >= tsMaxFwdInfo) {
- // pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
+ if (pSyncFwds->fwds >= SYNC_MAX_FWDS) {
+ // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
// pSyncFwds->fwds--;
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
}
if (pSyncFwds->fwds > 0) {
- pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo;
+ pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS;
}
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
@@ -1223,7 +1165,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break;
- pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
+ pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
@@ -1248,7 +1190,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
}
if (confirm && pFwdInfo->confirmed == 0) {
- sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
+ sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:0x%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1;
}
@@ -1289,7 +1231,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
- SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
+ SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
@@ -1334,14 +1276,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
- // only pkt from RPC or CQ can be forwarded
+ // only msg from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
- pSyncHead->type = TAOS_SMSG_FORWARD;
- pSyncHead->pversion = 0;
- pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
+ syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&pNode->mutex);
@@ -1371,4 +1311,3 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
return code;
}
-
diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c
new file mode 100644
index 0000000000000000000000000000000000000000..034f9a98a70c7373b74a84d24d478b52d7bf4df9
--- /dev/null
+++ b/src/sync/src/syncMsg.c
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "taoserror.h"
+#include "tglobal.h"
+#include "tchecksum.h"
+#include "syncInt.h"
+
+char *statusType[] = {
+ "broadcast",
+ "broadcast-rsp",
+ "setup-conn",
+ "setup-conn-rsp",
+ "exchange-data",
+ "exchange-data-rsp",
+ "check-role",
+ "check-role-rsp"
+};
+
+uint16_t syncGenTranId() {
+ return taosRand() & 0XFFFF;
+}
+
+static void syncBuildHead(SSyncHead *pHead) {
+ pHead->protocol = SYNC_PROTOCOL_VERSION;
+ pHead->signature = SYNC_SIGNATURE;
+ pHead->code = 0;
+ pHead->cId = 0;
+ taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
+}
+
+int32_t syncCheckHead(SSyncHead *pHead) {
+ if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL;
+ if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
+ if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID;
+ if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN;
+ if (pHead->type <= TAOS_SMSG_START || pHead->type >= TAOS_SMSG_END) return TSDB_CODE_SYN_INVALID_MSGTYPE;
+ if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) {
+ pHead->type = TAOS_SMSG_SYNC_FWD;
+ pHead->vgId = vgId;
+ pHead->len = len;
+ syncBuildHead(pHead);
+}
+
+void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) {
+ pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+
+ pMsg->version = version;
+ pMsg->code = code;
+}
+
+static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
+ pMsg->head.type = type;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+
+ pMsg->port = tsSyncPort;
+ pMsg->tranId = syncGenTranId();
+ pMsg->sourceId = vgId;
+ tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
+}
+
+void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
+void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
+void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
+
+void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
+ pMsg->head.type = TAOS_SMSG_STATUS;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+}
+
+void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) {
+ pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+}
+
+void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) {
+ pMsg->head.type = TAOS_SMSG_SYNC_FILE;
+ pMsg->head.vgId = vgId;
+ pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead);
+ syncBuildHead(&pMsg->head);
+}
\ No newline at end of file
diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c
index 4247468bcae77dab49f61dd684a80ada9701418c..8651879eb6cb06ff629c694f128b6107bf8f19a2 100644
--- a/src/sync/src/syncRestore.c
+++ b/src/sync/src/syncRestore.c
@@ -56,7 +56,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */
- SFileAck fileAck = {0};
+ SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore
@@ -69,7 +69,14 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
minfo.index = -1;
int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo));
if (ret != sizeof(SFileInfo) || minfo.index == -1) {
- sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
+ sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno));
+ break;
+ }
+
+ assert(ret == sizeof(SFileInfo));
+ ret = syncCheckHead((SSyncHead *)(&minfo));
+ if (ret != 0) {
+ sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret));
break;
}
@@ -94,12 +101,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
&sinfo.fversion);
// if file not there or magic is not the same, file shall be synced
- memset(&fileAck, 0, sizeof(fileAck));
+ memset(&fileAck, 0, sizeof(SFileAck));
+ syncBuildFileAck(&fileAck, pNode->vgId);
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
// send file ack
- ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
- if (ret != sizeof(fileAck)) {
+ ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
+ if (ret != sizeof(SFileAck)) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break;
}
@@ -289,12 +297,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t fversion = 0;
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
- SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()};
- if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
- sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
+ SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()};
+ if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
+ sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno));
return -1;
}
- sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId);
+ sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion);
diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c
index 82e3700c7a4c9df3f89eecb65d1a75fe9f7a012c..02d990313e8c95518e22939c8ab0d41ad05604c2 100644
--- a/src/sync/src/syncRetrieve.c
+++ b/src/sync/src/syncRetrieve.c
@@ -88,7 +88,7 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
- SFileAck fileAck = {0};
+ SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
@@ -103,11 +103,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion);
+ syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
// send the file info
- int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
- if (ret != sizeof(fileInfo)) {
+ int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
+ if (ret != sizeof(SFileInfo)) {
code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
@@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
break;
}
+ ret = syncCheckHead((SSyncHead*)(&fileAck));
+ if (ret != 0) {
+ code = -1;
+ sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
+ break;
+ }
+
// set the peer sync version
pPeer->sversion = fileInfo.fversion;
@@ -405,27 +413,22 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
- SFirstPkt firstPkt;
- memset(&firstPkt, 0, sizeof(firstPkt));
- firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
- firstPkt.syncHead.vgId = pNode->vgId;
- firstPkt.tranId = syncGenTranId();
- tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
- firstPkt.port = tsSyncPort;
+ SSyncMsg msg;
+ syncBuildSyncDataMsg(&msg, pNode->vgId);
- if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
- sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
+ if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
+ sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1;
}
- sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId);
+ sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
- SFirstPktRsp firstPktRsp;
- if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
- sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
+ SSyncRsp rsp;
+ if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
+ sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1;
}
- sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId);
+ sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
return 0;
}
diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/syncTcp.c
similarity index 89%
rename from src/sync/src/taosTcpPool.c
rename to src/sync/src/syncTcp.c
index eb05cf7c6f8d8d9a068b60c54f0ae1426e62c429..7bfdc4e440ee886065a4cd1b8f59d09e09556a47 100644
--- a/src/sync/src/taosTcpPool.c
+++ b/src/sync/src/syncTcp.c
@@ -19,10 +19,10 @@
#include "tutil.h"
#include "tsocket.h"
#include "taoserror.h"
-#include "taosTcpPool.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
+#include "syncTcp.h"
typedef struct SThreadObj {
pthread_t thread;
@@ -47,12 +47,12 @@ typedef struct {
int32_t closedByApp;
} SConnObj;
-static void *taosAcceptPeerTcpConnection(void *argv);
-static void *taosProcessTcpData(void *param);
-static void taosStopPoolThread(SThreadObj *pThread);
-static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
+static void *syncAcceptPeerTcpConnection(void *argv);
+static void *syncProcessTcpData(void *param);
+static void syncStopPoolThread(SThreadObj *pThread);
+static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
-void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
+void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
@@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
- if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) {
+ if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd);
tfree(pPool->pThread);
@@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
return pPool;
}
-void taosCloseTcpThreadPool(void *param) {
+void syncCloseTcpThreadPool(void *param) {
SPoolObj * pPool = param;
SThreadObj *pThread;
@@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) {
for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i];
- if (pThread) taosStopPoolThread(pThread);
+ if (pThread) syncStopPoolThread(pThread);
}
sDebug("%p TCP pool is closed", pPool);
@@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) {
tfree(pPool);
}
-void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
+void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
struct epoll_event event;
SPoolObj *pPool = param;
@@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return NULL;
}
- SThreadObj *pThread = taosGetTcpThread(pPool);
+ SThreadObj *pThread = syncGetTcpThread(pPool);
if (pThread == NULL) {
tfree(pConn);
return NULL;
@@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return pConn;
}
-void taosFreeTcpConn(void *param) {
+void syncFreeTcpConn(void *param) {
SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread;
@@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
#define maxEvents 10
-static void *taosProcessTcpData(void *param) {
+static void *syncProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *)param;
SPoolObj * pPool = pThread->pPool;
SPoolInfo * pInfo = &pPool->info;
@@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) {
if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
- taosFreeTcpConn(pConn);
+ syncFreeTcpConn(pConn);
continue;
}
}
@@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) {
return NULL;
}
-static void *taosAcceptPeerTcpConnection(void *argv) {
+static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolObj * pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info;
@@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
return NULL;
}
-static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
+static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
SThreadObj *pThread = pPool->pThread[pPool->nextId];
if (pThread) return pThread;
@@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
- int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread);
+ int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
pthread_attr_destroy(&thattr);
if (ret != 0) {
@@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
return pThread;
}
-static void taosStopPoolThread(SThreadObj *pThread) {
+static void syncStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) {
return;
diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c
index 5a64a0a36a9c5e0557e78216ba41a48a23cc874b..161105d86c9a45fb1b8827f01701a43a3e08000c 100644
--- a/src/sync/test/syncServer.c
+++ b/src/sync/test/syncServer.c
@@ -100,7 +100,7 @@ int processRpcMsg(void *item) {
pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen;
- uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version);
+ uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version);
writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
@@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) {
int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data;
- uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
+ uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize);
diff --git a/tests/script/unique/db/replica_add13.sim b/tests/script/unique/db/replica_add13.sim
index 1df49ba6582b3e1502e9b19ac3b11626175a295e..defe306f2fd413f57f2e3ed471cad8601476a8d4 100644
--- a/tests/script/unique/db/replica_add13.sim
+++ b/tests/script/unique/db/replica_add13.sim
@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1589529000012, 2)
sql insert into d2.t2 values(1589529000022, 2)
sql insert into d3.t3 values(1589529000032, 2)
sql insert into d4.t4 values(1589529000042, 2)
+sleep 1000
sql select * from d1.t1
if $rows != 2 then
@@ -141,6 +142,7 @@ sql insert into d1.t1 values(1589529000013, 3)
sql insert into d2.t2 values(1589529000023, 3)
sql insert into d3.t3 values(1589529000033, 3)
sql insert into d4.t4 values(1589529000043, 3)
+sleep 1000
sql select * from d1.t1
if $rows != 3 then
@@ -172,6 +174,7 @@ sql insert into d1.t1 values(1589529000014, 4)
sql insert into d2.t2 values(1589529000024, 4)
sql insert into d3.t3 values(1589529000034, 4)
sql insert into d4.t4 values(1589529000044, 4)
+sleep 1000
sql select * from d1.t1
print select * from d1.t1 $rows
@@ -207,6 +210,7 @@ sql insert into d1.t1 values(1589529000015, 5)
sql insert into d2.t2 values(1589529000025, 5)
sql insert into d3.t3 values(1589529000035, 5)
sql insert into d4.t4 values(1589529000045, 5)
+sleep 1000
sql select * from d1.t1
if $rows != 5 then
@@ -238,6 +242,7 @@ sql insert into d1.t1 values(1589529000016, 6)
sql insert into d2.t2 values(1589529000026, 6)
sql insert into d3.t3 values(1589529000036, 6)
sql insert into d4.t4 values(1589529000046, 6)
+sleep 1000
sql select * from d1.t1
if $rows != 6 then
diff --git a/tests/script/unique/db/replica_add23.sim b/tests/script/unique/db/replica_add23.sim
index 5da73cd117ef218d1421d4f7f6ee7026c9e6bef3..d93894deb89beaadfc0b779acf93caaf42463009 100644
--- a/tests/script/unique/db/replica_add23.sim
+++ b/tests/script/unique/db/replica_add23.sim
@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1588262400002, 2)
sql insert into d2.t2 values(1588262400002, 2)
sql insert into d3.t3 values(1588262400002, 2)
sql insert into d4.t4 values(1588262400002, 2)
+sleep 1000
sql select * from d1.t1
if $rows != 2 then
@@ -142,6 +143,7 @@ sql insert into d1.t1 values(1588262400003, 3)
sql insert into d2.t2 values(1588262400003, 3)
sql insert into d3.t3 values(1588262400003, 3)
sql insert into d4.t4 values(1588262400003, 3)
+sleep 1000
sql select * from d1.t1
if $rows != 3 then
@@ -173,6 +175,7 @@ sql insert into d1.t1 values(1588262400004, 4)
sql insert into d2.t2 values(1588262400004, 4)
sql insert into d3.t3 values(1588262400004, 4)
sql insert into d4.t4 values(1588262400004, 4)
+sleep 1000
sql select * from d1.t1
if $rows != 4 then
@@ -204,6 +207,7 @@ sql insert into d1.t1 values(1588262400005, 5)
sql insert into d2.t2 values(1588262400005, 5)
sql insert into d3.t3 values(1588262400005, 5)
sql insert into d4.t4 values(1588262400005, 5)
+sleep 1000
sql select * from d1.t1
if $rows != 5 then
@@ -235,6 +239,7 @@ sql insert into d1.t1 values(1588262400006, 6)
sql insert into d2.t2 values(1588262400006, 6)
sql insert into d3.t3 values(1588262400006, 6)
sql insert into d4.t4 values(1588262400006, 6)
+sleep 1000
sql select * from d1.t1
if $rows != 6 then