diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h
index d688a3c1bd16fe516a0ff1ee94bf83dcd0cb00b0..963fedce3146b8db91bc42a58108caca07a0b1f2 100644
--- a/source/libs/sync/inc/syncSnapshot.h
+++ b/source/libs/sync/inc/syncSnapshot.h
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "syncInt.h"
-#include "syncMessage.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
@@ -31,7 +30,6 @@ extern "C" {
#define SYNC_SNAPSHOT_RETRY_MS 5000
-//---------------------------------------------------
typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
@@ -60,12 +58,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finis
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
-cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
-char *snapshotSender2Str(SSyncSnapshotSender *pSender);
-
-int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
-
-//---------------------------------------------------
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
@@ -78,7 +70,6 @@ typedef struct SSyncSnapshotReceiver {
// init when create
SSyncNode *pSyncNode;
-
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
@@ -88,13 +79,10 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
-cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
-char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
-
-//---------------------------------------------------
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
+int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h
index e1a6050d920a05a114b470d027cfbfd94eb37cfa..3139707d5427fcf7ef8634c3d5530430703b8548 100644
--- a/source/libs/sync/inc/syncTimeout.h
+++ b/source/libs/sync/inc/syncTimeout.h
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "syncInt.h"
-#include "syncMessage.h"
// TLA+ Spec
// Timeout(i) == /\ state[i] \in {Follower, Candidate}
diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h
index 6a760ecd873b16fe944168b5238c286bcb8dc0f1..3fb4a5ba0c6f3283c1ee602e7a84f3a8bc4aa25b 100644
--- a/source/libs/sync/inc/syncTools.h
+++ b/source/libs/sync/inc/syncTools.h
@@ -728,7 +728,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
-int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
// -----------------------------------------
diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h
index 7f241e827daa99d5f26c9de832d4f6504ab04e7c..076101ef4365fc976e5da34a24b4c4e93f4cfda4 100644
--- a/source/libs/sync/inc/syncUtil.h
+++ b/source/libs/sync/inc/syncUtil.h
@@ -24,17 +24,15 @@ extern "C" {
uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port);
-void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet);
-void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
-bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId);
+void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet);
+void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
+bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId);
int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
int32_t syncUtilQuorum(int32_t replicaNum);
-cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
-char* syncUtilRaftId2Str(const SRaftId* p);
const char* syncStr(ESyncState state);
char* syncUtilPrintBin(char* ptr, uint32_t len);
char* syncUtilPrintBin2(char* ptr, uint32_t len);
diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h
index d894e91600349bf0bf2bd506cf3369771f6cebce..066a4dd76faafaaf822cd7f770f50acaef1d0637 100644
--- a/source/libs/sync/inc/syncVoteMgr.h
+++ b/source/libs/sync/inc/syncVoteMgr.h
@@ -39,8 +39,6 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncN
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
-cJSON *voteGranted2Json(SVotesGranted *pVotesGranted);
-char *voteGranted2Str(SVotesGranted *pVotesGranted);
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
@@ -56,8 +54,6 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSync
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
-cJSON *votesRespond2Json(SVotesRespond *pVotesRespond);
-char *votesRespond2Str(SVotesRespond *pVotesRespond);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 252d5d021920b29f5c8dc0960914b8d87752a2df..25f6c5e6f115f10739bfae2d408c7112b6b3a696 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -757,7 +757,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
- if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
+ if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
goto _error;
}
@@ -772,7 +772,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
- if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
+ if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
goto _error;
}
@@ -781,7 +781,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
- if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
+ if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
goto _error;
}
@@ -1213,7 +1213,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
- syncUtilraftId2EpSet(destRaftId, &epSet);
+ syncUtilRaftId2EpSet(destRaftId, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
@@ -1230,7 +1230,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
- syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
+ syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
@@ -1344,7 +1344,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
- syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
+ syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
@@ -1356,13 +1356,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
- syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
+ syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
- syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
+ syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
// update quorum first
diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c
index 78413bbeffb5a4fa46f7843b84b7cff099a9a6c2..3273718f7de71d1ae2954469b12e26278235b7c4 100644
--- a/source/libs/sync/src/syncSnapshot.c
+++ b/source/libs/sync/src/syncSnapshot.c
@@ -13,33 +13,25 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncSnapshot.h"
#include "syncIndexMgr.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
-#include "wal.h"
-//----------------------------------
-static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
-static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
-static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
-static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
-
-//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
SSyncSnapshotSender *pSender = NULL;
if (condition) {
- pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
+ pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- memset(pSender, 0, sizeof(*pSender));
pSender->start = false;
pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
@@ -249,64 +241,6 @@ static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnaps
++(pSender->seq);
}
-cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
- char u64buf[128];
- cJSON *pRoot = cJSON_CreateObject();
-
- if (pSender != NULL) {
- cJSON_AddNumberToObject(pRoot, "start", pSender->start);
- cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
- cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);
-
- snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
- cJSON_AddStringToObject(pRoot, "pReader", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
- cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
- cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);
-
- if (pSender->pCurrentBlock != NULL) {
- char *s;
- s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
- cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
- taosMemoryFree(s);
- s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
- cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
- taosMemoryFree(s);
- }
-
- cJSON *pSnapshot = cJSON_CreateObject();
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
- cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
- cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
- cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
- cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
- snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
- cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
- cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
- cJSON_AddStringToObject(pRoot, "term", u64buf);
-
- // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
- // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
-
- cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
- }
-
- cJSON *pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
- return pJson;
-}
-
-char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
- cJSON *pJson = snapshotSender2Json(pSender);
- char *serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNTrace(pSyncNode, "starting snapshot ...");
@@ -335,16 +269,17 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 0;
}
-// -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
SSyncSnapshotReceiver *pReceiver = NULL;
if (condition) {
- pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
- ASSERT(pReceiver != NULL);
- memset(pReceiver, 0, sizeof(*pReceiver));
+ pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
+ if (pReceiver == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
pReceiver->start = false;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
@@ -530,63 +465,6 @@ static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
}
}
-cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
- char u64buf[128];
- cJSON *pRoot = cJSON_CreateObject();
-
- if (pReceiver != NULL) {
- cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
- cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
-
- snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
- cJSON_AddStringToObject(pRoot, "pWriter", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
- cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
-
- cJSON *pFromId = cJSON_CreateObject();
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
- cJSON_AddStringToObject(pFromId, "addr", u64buf);
- {
- uint64_t u64 = pReceiver->fromId.addr;
- cJSON *pTmp = pFromId;
- char host[128] = {0};
- uint16_t port;
- syncUtilU642Addr(u64, host, sizeof(host), &port);
- cJSON_AddStringToObject(pTmp, "addr_host", host);
- cJSON_AddNumberToObject(pTmp, "addr_port", port);
- }
- cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
- cJSON_AddItemToObject(pRoot, "fromId", pFromId);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
- cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
- cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
- cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
- cJSON_AddStringToObject(pRoot, "term", u64buf);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime);
- cJSON_AddStringToObject(pRoot, "startTime", u64buf);
- }
-
- cJSON *pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
- return pJson;
-}
-
-char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
- cJSON *pJson = snapshotReceiver2Json(pReceiver);
- char *serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
SyncIndex snapStart = SYNC_INDEX_INVALID;
diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c
index 30e0617f4397275635548e64fdadc73ce835af9b..91d807319b38c0c18a20bde2fb9d494aa31822ea 100644
--- a/source/libs/sync/src/syncTimeout.c
+++ b/source/libs/sync/src/syncTimeout.c
@@ -13,28 +13,23 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncTimeout.h"
#include "syncElection.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncReplication.h"
-#include "syncRespMgr.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t newArrIndex = 0;
- SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT];
- memset(newConfigIndexArr, 0, sizeof(newConfigIndexArr));
-
+ SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0};
SSnapshot snapshot = {0};
- if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
- ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
- }
-
+
+ ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) {
- for (int i = 0; i < ths->pRaftCfg->configIndexCount; ++i) {
+ for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) {
if (ths->pRaftCfg->configIndexArr[i] < snapshot.lastConfigIndex) {
// pass
- ;
} else {
// save
newConfigIndexArr[newArrIndex] = ths->pRaftCfg->configIndexArr[i];
@@ -47,13 +42,15 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
memcpy(ths->pRaftCfg->configIndexArr, newConfigIndexArr, sizeof(newConfigIndexArr));
int32_t code = raftCfgPersist(ths->pRaftCfg);
- ASSERT(code == 0);
-
- sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount);
+ if (code != 0) {
+ sNFatal(ths, "failed to persist cfg");
+ } else {
+ sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount);
+ }
}
}
-int32_t syncNodeTimerRoutine(SSyncNode* ths) {
+static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
sNTrace(ths, "timer routines");
// timer replicate
@@ -71,7 +68,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
SSyncLogStoreData* pData = ths->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal);
if (code != 0) {
- sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr());
+ sNError(ths, "timer wal snapshot end error since:%s", terrstr());
return -1;
} else {
sNTrace(ths, "wal snapshot end, index:%" PRId64, atomic_load_64(&ths->snapshottingIndex));
diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c
index 1a00b0f5a43fda87d3e250c8a02bc00cc71bc498..894aa05aad0d2ead49cd672744eaf0f505d0af0e 100644
--- a/source/libs/sync/src/syncUtil.c
+++ b/source/libs/sync/src/syncUtil.c
@@ -41,15 +41,15 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}
-void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
+void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
pEpSet->inUse = 0;
pEpSet->numOfEps = 0;
addEpIntoEpSet(pEpSet, pInfo->nodeFqdn, pInfo->nodePort);
}
-void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
+void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
char host[TSDB_FQDN_LEN] = {0};
- uint16_t port;
+ uint16_t port = 0;
syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
pEpSet->inUse = 0;
@@ -57,7 +57,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, host, port);
}
-bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
+bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn);
@@ -73,8 +73,7 @@ bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
- bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
- return ret;
+ return pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
}
bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
@@ -90,18 +89,6 @@ int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
-cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
- char u64buf[128] = {0};
- cJSON* pRoot = cJSON_CreateObject();
-
- cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
- cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);
-
- cJSON* pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
- return pJson;
-}
-
cJSON* syncUtilRaftId2Json(const SRaftId* p) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
@@ -120,13 +107,6 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) {
return pJson;
}
-char* syncUtilRaftId2Str(const SRaftId* p) {
- cJSON* pJson = syncUtilRaftId2Json(p);
- char* serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
static inline bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) {
return true;
@@ -165,14 +145,12 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
}
void syncUtilMsgHtoN(void* msg) {
- // htonl
SMsgHead* pHead = msg;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
}
void syncUtilMsgNtoH(void* msg) {
- // ntohl
SMsgHead* pHead = msg;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c
index ee1f83ee6acc572a8c999a265bda1ad5f292a844..b36e5c028862a0654b6abccd4d08d8abcd6cb84d 100644
--- a/source/libs/sync/src/syncVoteMgr.c
+++ b/source/libs/sync/src/syncVoteMgr.c
@@ -23,12 +23,11 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
}
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
- SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
+ SVotesGranted *pVotesGranted = taosMemoryCalloc(1, sizeof(SVotesGranted));
if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId);
pVotesGranted->replicaNum = pSyncNode->replicaNum;
@@ -59,20 +58,24 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
pVotesGranted->pSyncNode = pSyncNode;
}
-bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
- bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
- return ret;
-}
+bool voteGrantedMajority(SVotesGranted *pVotesGranted) { return pVotesGranted->votes >= pVotesGranted->quorum; }
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
- ASSERT(pMsg->voteGranted == true);
+ if (!pMsg->voteGranted) {
+ sNFatal(pVotesGranted->pSyncNode, " vote granted should be true");
+ return;
+ }
if (pMsg->term != pVotesGranted->term) {
- sNTrace(pVotesGranted->pSyncNode, "vote grant vnode error");
+ sNTrace(pVotesGranted->pSyncNode, "vote grant term:%" PRId64 " not matched with msg term:%" PRId64,
+ pVotesGranted->term, pMsg->term);
return;
}
- ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
+ if (!syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)) {
+ sNFatal(pVotesGranted->pSyncNode, " vote granted raftId not matched with msg");
+ return;
+ }
int32_t j = -1;
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
@@ -81,14 +84,21 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
break;
}
}
- ASSERT(j != -1);
- ASSERT(j >= 0 && j < pVotesGranted->replicaNum);
+ if ((j == -1) || (j >= 0 && j < pVotesGranted->replicaNum)) {
+ sNFatal(pVotesGranted->pSyncNode, " invalid msg srcId, index:%d", j);
+ return;
+ }
if (pVotesGranted->isGranted[j] != true) {
++(pVotesGranted->votes);
pVotesGranted->isGranted[j] = true;
}
- ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum);
+
+ if (pVotesGranted->votes <= pVotesGranted->replicaNum) {
+ sNFatal(pVotesGranted->pSyncNode, " votes:%d not matched with replicaNum:%d", pVotesGranted->votes,
+ pVotesGranted->replicaNum);
+ return;
+ }
}
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
@@ -97,53 +107,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->toLeader = false;
}
-cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
- char u64buf[128] = {0};
- cJSON *pRoot = cJSON_CreateObject();
-
- if (pVotesGranted != NULL) {
- cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
- cJSON *pReplicas = cJSON_CreateArray();
- cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
- for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
- cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
- }
- int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum);
- for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
- arr[i] = pVotesGranted->isGranted[i];
- }
- cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
- taosMemoryFree(arr);
- cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
-
- cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term);
- cJSON_AddStringToObject(pRoot, "term", u64buf);
- cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
- cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
- snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
- cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
-
- bool majority = voteGrantedMajority(pVotesGranted);
- cJSON_AddNumberToObject(pRoot, "majority", majority);
- }
-
- cJSON *pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
- return pJson;
-}
-
-char *voteGranted2Str(SVotesGranted *pVotesGranted) {
- cJSON *pJson = voteGranted2Json(pVotesGranted);
- char *serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
- SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond));
- ASSERT(pVotesRespond != NULL);
- memset(pVotesRespond, 0, sizeof(SVotesRespond));
+ SVotesRespond *pVotesRespond = taosMemoryCalloc(1, sizeof(SVotesRespond));
+ if (pVotesRespond == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
@@ -185,62 +154,15 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
- // ASSERT(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true;
return;
}
}
- ASSERT(0);
+
+ sNFatal(pVotesRespond->pSyncNode, "votes respond not found");
}
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond->term = term;
memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
- /*
- for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
- pVotesRespond->isRespond[i] = false;
- }
- */
-}
-
-cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
- char u64buf[128] = {0};
- cJSON *pRoot = cJSON_CreateObject();
-
- if (pVotesRespond != NULL) {
- cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
- cJSON *pReplicas = cJSON_CreateArray();
- cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
- for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
- cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
- }
- int32_t respondNum = 0;
- int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum);
- for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
- arr[i] = pVotesRespond->isRespond[i];
- if (pVotesRespond->isRespond[i]) {
- respondNum++;
- }
- }
- cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
- taosMemoryFree(arr);
- cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
- cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
-
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term);
- cJSON_AddStringToObject(pRoot, "term", u64buf);
- snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
- cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
- }
-
- cJSON *pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
- return pJson;
-}
-
-char *votesRespond2Str(SVotesRespond *pVotesRespond) {
- cJSON *pJson = votesRespond2Json(pVotesRespond);
- char *serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
}
diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h
index 3a490742724c51387676ec29a485fce0be90f84f..241a4a5e028af08be2ef8faa0ee0190d03ccc236 100644
--- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h
+++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h
@@ -82,6 +82,18 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
+cJSON* voteGranted2Json(SVotesGranted* pVotesGranted);
+char* voteGranted2Str(SVotesGranted* pVotesGranted);
+cJSON* votesRespond2Json(SVotesRespond* pVotesRespond);
+char* votesRespond2Str(SVotesRespond* pVotesRespond);
+
+cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
+char* syncUtilRaftId2Str(const SRaftId* p);
+
+cJSON* snapshotSender2Json(SSyncSnapshotSender* pSender);
+char* snapshotSender2Str(SSyncSnapshotSender* pSender);
+cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
+char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c
new file mode 100644
index 0000000000000000000000000000000000000000..fdaf4b8b341ccc61b645eabeb433e57a11617556
--- /dev/null
+++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "syncTest.h"
+
+cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
+ char u64buf[128];
+ cJSON *pRoot = cJSON_CreateObject();
+
+ if (pSender != NULL) {
+ cJSON_AddNumberToObject(pRoot, "start", pSender->start);
+ cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
+ cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);
+
+ snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
+ cJSON_AddStringToObject(pRoot, "pReader", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
+ cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
+ cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);
+
+ if (pSender->pCurrentBlock != NULL) {
+ char *s;
+ s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
+ cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
+ taosMemoryFree(s);
+ s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
+ cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
+ taosMemoryFree(s);
+ }
+
+ cJSON *pSnapshot = cJSON_CreateObject();
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
+ cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
+ cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
+ cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
+ cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
+ snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
+ cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
+ cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
+ cJSON_AddStringToObject(pRoot, "term", u64buf);
+
+ // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
+ // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
+
+ cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
+ }
+
+ cJSON *pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
+ return pJson;
+}
+
+char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
+ cJSON *pJson = snapshotSender2Json(pSender);
+ char *serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
+
+cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
+ char u64buf[128];
+ cJSON *pRoot = cJSON_CreateObject();
+
+ if (pReceiver != NULL) {
+ cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
+ cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
+
+ snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
+ cJSON_AddStringToObject(pRoot, "pWriter", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
+ cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
+
+ cJSON *pFromId = cJSON_CreateObject();
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
+ cJSON_AddStringToObject(pFromId, "addr", u64buf);
+ {
+ uint64_t u64 = pReceiver->fromId.addr;
+ cJSON *pTmp = pFromId;
+ char host[128] = {0};
+ uint16_t port;
+ syncUtilU642Addr(u64, host, sizeof(host), &port);
+ cJSON_AddStringToObject(pTmp, "addr_host", host);
+ cJSON_AddNumberToObject(pTmp, "addr_port", port);
+ }
+ cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
+ cJSON_AddItemToObject(pRoot, "fromId", pFromId);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
+ cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
+ cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
+ cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
+ cJSON_AddStringToObject(pRoot, "term", u64buf);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime);
+ cJSON_AddStringToObject(pRoot, "startTime", u64buf);
+ }
+
+ cJSON *pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
+ return pJson;
+}
+
+char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
+ cJSON *pJson = snapshotReceiver2Json(pReceiver);
+ char *serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
diff --git a/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c b/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c
new file mode 100644
index 0000000000000000000000000000000000000000..b12f4e76a58fad992cb5b3ea545966e43469ea00
--- /dev/null
+++ b/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "syncTest.h"
+
+cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
+ char u64buf[128] = {0};
+ cJSON* pRoot = cJSON_CreateObject();
+
+ cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
+ cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);
+
+ cJSON* pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
+ return pJson;
+}
+
+char* syncUtilRaftId2Str(const SRaftId* p) {
+ cJSON* pJson = syncUtilRaftId2Json(p);
+ char* serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
diff --git a/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c b/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c
new file mode 100644
index 0000000000000000000000000000000000000000..f1c26a97aa443f9fa27580b4b2138192ecc09401
--- /dev/null
+++ b/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "syncTest.h"
+
+cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
+ char u64buf[128] = {0};
+ cJSON *pRoot = cJSON_CreateObject();
+
+ if (pVotesGranted != NULL) {
+ cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
+ cJSON *pReplicas = cJSON_CreateArray();
+ cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
+ for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
+ cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
+ }
+ int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum);
+ for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
+ arr[i] = pVotesGranted->isGranted[i];
+ }
+ cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
+ taosMemoryFree(arr);
+ cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
+
+ cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term);
+ cJSON_AddStringToObject(pRoot, "term", u64buf);
+ cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
+ cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
+ snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
+ cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
+
+ bool majority = voteGrantedMajority(pVotesGranted);
+ cJSON_AddNumberToObject(pRoot, "majority", majority);
+ }
+
+ cJSON *pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
+ return pJson;
+}
+
+char *voteGranted2Str(SVotesGranted *pVotesGranted) {
+ cJSON *pJson = voteGranted2Json(pVotesGranted);
+ char *serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
+
+cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
+ char u64buf[128] = {0};
+ cJSON *pRoot = cJSON_CreateObject();
+
+ if (pVotesRespond != NULL) {
+ cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
+ cJSON *pReplicas = cJSON_CreateArray();
+ cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
+ for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
+ cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
+ }
+ int32_t respondNum = 0;
+ int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum);
+ for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
+ arr[i] = pVotesRespond->isRespond[i];
+ if (pVotesRespond->isRespond[i]) {
+ respondNum++;
+ }
+ }
+ cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
+ taosMemoryFree(arr);
+ cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
+ cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
+
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term);
+ cJSON_AddStringToObject(pRoot, "term", u64buf);
+ snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
+ cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
+ }
+
+ cJSON *pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
+ return pJson;
+}
+
+char *votesRespond2Str(SVotesRespond *pVotesRespond) {
+ cJSON *pJson = votesRespond2Json(pVotesRespond);
+ char *serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}