From f7d263862b3d8b508cd114669def31432ba3045e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 10 Nov 2022 10:04:05 +0800 Subject: [PATCH] refact: remove assert and adjust log --- source/libs/sync/inc/syncSnapshot.h | 14 +- source/libs/sync/inc/syncTimeout.h | 1 - source/libs/sync/inc/syncTools.h | 1 - source/libs/sync/inc/syncUtil.h | 8 +- source/libs/sync/inc/syncVoteMgr.h | 4 - source/libs/sync/src/syncMain.c | 16 +-- source/libs/sync/src/syncSnapshot.c | 136 +----------------- source/libs/sync/src/syncTimeout.c | 27 ++-- source/libs/sync/src/syncUtil.c | 32 +---- source/libs/sync/src/syncVoteMgr.c | 136 ++++-------------- .../sync/test/sync_test_lib/inc/syncTest.h | 12 ++ .../sync_test_lib/src/syncSnapshotDebug.c | 132 +++++++++++++++++ .../test/sync_test_lib/src/syncUtilDebug.c | 36 +++++ .../test/sync_test_lib/src/syncVoteMgrDebug.c | 102 +++++++++++++ 14 files changed, 347 insertions(+), 310 deletions(-) create mode 100644 source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c create mode 100644 source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c create mode 100644 source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index d688a3c1bd..963fedce31 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" #define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 @@ -31,7 +30,6 @@ extern "C" { #define SYNC_SNAPSHOT_RETRY_MS 5000 -//--------------------------------------------------- typedef struct SSyncSnapshotSender { bool start; int32_t seq; @@ -60,12 +58,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finis int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); - -int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); - -//--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; @@ -78,7 +70,6 @@ typedef struct SSyncSnapshotReceiver { // init when create SSyncNode *pSyncNode; - } SSyncSnapshotReceiver; SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); @@ -88,13 +79,10 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); - -//--------------------------------------------------- // on message int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); +int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index e1a6050d92..3139707d54 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" // TLA+ Spec // Timeout(i) == /\ state[i] \in {Follower, Candidate} diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index 6a760ecd87..3fb4a5ba0c 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -728,7 +728,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); // ----------------------------------------- diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 7f241e827d..076101ef43 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -24,17 +24,15 @@ extern "C" { uint64_t syncUtilAddr2U64(const char* host, uint16_t port); void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port); -void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet); -void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); -bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId); +void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet); +void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); +bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilEmptyId(const SRaftId* pId); int32_t syncUtilElectRandomMS(int32_t min, int32_t max); int32_t syncUtilQuorum(int32_t replicaNum); -cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); -char* syncUtilRaftId2Str(const SRaftId* p); const char* syncStr(ESyncState state); char* syncUtilPrintBin(char* ptr, uint32_t len); char* syncUtilPrintBin2(char* ptr, uint32_t len); diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index d894e91600..066a4dd76f 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -39,8 +39,6 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncN bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); -cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); -char *voteGranted2Str(SVotesGranted *pVotesGranted); typedef struct SVotesRespond { SRaftId (*replicas)[TSDB_MAX_REPLICA]; @@ -56,8 +54,6 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSync bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); -cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); -char *votesRespond2Str(SVotesRespond *pVotesRespond); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 252d5d0219..25f6c5e6f1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -757,7 +757,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { + if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId); goto _error; } @@ -772,7 +772,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } } for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { - if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { + if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i); goto _error; } @@ -781,7 +781,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i); goto _error; } @@ -1213,7 +1213,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; - syncUtilraftId2EpSet(destRaftId, &epSet); + syncUtilRaftId2EpSet(destRaftId, &epSet); if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); @@ -1230,7 +1230,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; - syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + syncUtilNodeInfo2EpSet(nodeInfo, &epSet); if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); @@ -1344,7 +1344,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; @@ -1356,13 +1356,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde } } for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } // update quorum first diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 78413bbeff..3273718f7d 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -13,33 +13,25 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncSnapshot.h" #include "syncIndexMgr.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" -#include "wal.h" -//---------------------------------- -static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg); -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); -static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); -static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); - -//---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); SSyncSnapshotSender *pSender = NULL; if (condition) { - pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); + pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender)); if (pSender == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - memset(pSender, 0, sizeof(*pSender)); pSender->start = false; pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; @@ -249,64 +241,6 @@ static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnaps ++(pSender->seq); } -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { - char u64buf[128]; - cJSON *pRoot = cJSON_CreateObject(); - - if (pSender != NULL) { - cJSON_AddNumberToObject(pRoot, "start", pSender->start); - cJSON_AddNumberToObject(pRoot, "seq", pSender->seq); - cJSON_AddNumberToObject(pRoot, "ack", pSender->ack); - - snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader); - cJSON_AddStringToObject(pRoot, "pReader", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); - cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen); - - if (pSender->pCurrentBlock != NULL) { - char *s; - s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); - taosMemoryFree(s); - } - - cJSON *pSnapshot = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex); - cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm); - cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf); - cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS); - cJSON_AddStringToObject(pRoot, "sendingMS", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm); - // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); - - cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot); - return pJson; -} - -char *snapshotSender2Str(SSyncSnapshotSender *pSender) { - cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { sNTrace(pSyncNode, "starting snapshot ..."); @@ -335,16 +269,17 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } -// ------------------------------------- SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); SSyncSnapshotReceiver *pReceiver = NULL; if (condition) { - pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver)); - ASSERT(pReceiver != NULL); - memset(pReceiver, 0, sizeof(*pReceiver)); + pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver)); + if (pReceiver == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pReceiver->start = false; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; @@ -530,63 +465,6 @@ static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapsh } } -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { - char u64buf[128]; - cJSON *pRoot = cJSON_CreateObject(); - - if (pReceiver != NULL) { - cJSON_AddNumberToObject(pRoot, "start", pReceiver->start); - cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack); - - snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter); - cJSON_AddStringToObject(pRoot, "pWriter", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - - cJSON *pFromId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr); - cJSON_AddStringToObject(pFromId, "addr", u64buf); - { - uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId); - cJSON_AddItemToObject(pRoot, "fromId", pFromId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex); - cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm); - cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex); - cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime); - cJSON_AddStringToObject(pRoot, "startTime", u64buf); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot); - return pJson; -} - -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { - cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { SyncIndex snapStart = SYNC_INDEX_INVALID; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 30e0617f43..91d807319b 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -13,28 +13,23 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncTimeout.h" #include "syncElection.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncReplication.h" -#include "syncRespMgr.h" static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t newArrIndex = 0; - SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT]; - memset(newConfigIndexArr, 0, sizeof(newConfigIndexArr)); - + SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0}; SSnapshot snapshot = {0}; - if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) { - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - } - + + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) { - for (int i = 0; i < ths->pRaftCfg->configIndexCount; ++i) { + for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) { if (ths->pRaftCfg->configIndexArr[i] < snapshot.lastConfigIndex) { // pass - ; } else { // save newConfigIndexArr[newArrIndex] = ths->pRaftCfg->configIndexArr[i]; @@ -47,13 +42,15 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) { memcpy(ths->pRaftCfg->configIndexArr, newConfigIndexArr, sizeof(newConfigIndexArr)); int32_t code = raftCfgPersist(ths->pRaftCfg); - ASSERT(code == 0); - - sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount); + if (code != 0) { + sNFatal(ths, "failed to persist cfg"); + } else { + sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount); + } } } -int32_t syncNodeTimerRoutine(SSyncNode* ths) { +static int32_t syncNodeTimerRoutine(SSyncNode* ths) { sNTrace(ths, "timer routines"); // timer replicate @@ -71,7 +68,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { SSyncLogStoreData* pData = ths->pLogStore->data; int32_t code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr()); + sNError(ths, "timer wal snapshot end error since:%s", terrstr()); return -1; } else { sNTrace(ths, "wal snapshot end, index:%" PRId64, atomic_load_64(&ths->snapshottingIndex)); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1a00b0f5a4..894aa05aad 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -41,15 +41,15 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); } -void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) { +void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) { pEpSet->inUse = 0; pEpSet->numOfEps = 0; addEpIntoEpSet(pEpSet, pInfo->nodeFqdn, pInfo->nodePort); } -void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { +void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { char host[TSDB_FQDN_LEN] = {0}; - uint16_t port; + uint16_t port = 0; syncUtilU642Addr(raftId->addr, host, sizeof(host), &port); pEpSet->inUse = 0; @@ -57,7 +57,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { addEpIntoEpSet(pEpSet, host, port); } -bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) { +bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn); if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn); @@ -73,8 +73,7 @@ bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* } bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { - bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; - return ret; + return pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; } bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); } @@ -90,18 +89,6 @@ int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } -cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn); - cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort); - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); - return pJson; -} - cJSON* syncUtilRaftId2Json(const SRaftId* p) { char u64buf[128] = {0}; cJSON* pRoot = cJSON_CreateObject(); @@ -120,13 +107,6 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) { return pJson; } -char* syncUtilRaftId2Str(const SRaftId* p) { - cJSON* pJson = syncUtilRaftId2Json(p); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - static inline bool syncUtilCanPrint(char c) { if (c >= 32 && c <= 126) { return true; @@ -165,14 +145,12 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) { } void syncUtilMsgHtoN(void* msg) { - // htonl SMsgHead* pHead = msg; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); } void syncUtilMsgNtoH(void* msg) { - // ntohl SMsgHead* pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index ee1f83ee6a..b36e5c0288 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -23,12 +23,11 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { } SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { - SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); + SVotesGranted *pVotesGranted = taosMemoryCalloc(1, sizeof(SVotesGranted)); if (pVotesGranted == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - memset(pVotesGranted, 0, sizeof(SVotesGranted)); pVotesGranted->replicas = &(pSyncNode->replicasId); pVotesGranted->replicaNum = pSyncNode->replicaNum; @@ -59,20 +58,24 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) { pVotesGranted->pSyncNode = pSyncNode; } -bool voteGrantedMajority(SVotesGranted *pVotesGranted) { - bool ret = pVotesGranted->votes >= pVotesGranted->quorum; - return ret; -} +bool voteGrantedMajority(SVotesGranted *pVotesGranted) { return pVotesGranted->votes >= pVotesGranted->quorum; } void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { - ASSERT(pMsg->voteGranted == true); + if (!pMsg->voteGranted) { + sNFatal(pVotesGranted->pSyncNode, " vote granted should be true"); + return; + } if (pMsg->term != pVotesGranted->term) { - sNTrace(pVotesGranted->pSyncNode, "vote grant vnode error"); + sNTrace(pVotesGranted->pSyncNode, "vote grant term:%" PRId64 " not matched with msg term:%" PRId64, + pVotesGranted->term, pMsg->term); return; } - ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); + if (!syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)) { + sNFatal(pVotesGranted->pSyncNode, " vote granted raftId not matched with msg"); + return; + } int32_t j = -1; for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { @@ -81,14 +84,21 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { break; } } - ASSERT(j != -1); - ASSERT(j >= 0 && j < pVotesGranted->replicaNum); + if ((j == -1) || (j >= 0 && j < pVotesGranted->replicaNum)) { + sNFatal(pVotesGranted->pSyncNode, " invalid msg srcId, index:%d", j); + return; + } if (pVotesGranted->isGranted[j] != true) { ++(pVotesGranted->votes); pVotesGranted->isGranted[j] = true; } - ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum); + + if (pVotesGranted->votes <= pVotesGranted->replicaNum) { + sNFatal(pVotesGranted->pSyncNode, " votes:%d not matched with replicaNum:%d", pVotesGranted->votes, + pVotesGranted->replicaNum); + return; + } } void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { @@ -97,53 +107,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { pVotesGranted->toLeader = false; } -cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { - char u64buf[128] = {0}; - cJSON *pRoot = cJSON_CreateObject(); - - if (pVotesGranted != NULL) { - cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i])); - } - int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum); - for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { - arr[i] = pVotesGranted->isGranted[i]; - } - cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted); - - cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); - cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); - snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - - bool majority = voteGrantedMajority(pVotesGranted); - cJSON_AddNumberToObject(pRoot, "majority", majority); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot); - return pJson; -} - -char *voteGranted2Str(SVotesGranted *pVotesGranted) { - cJSON *pJson = voteGranted2Json(pVotesGranted); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { - SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond)); - ASSERT(pVotesRespond != NULL); - memset(pVotesRespond, 0, sizeof(SVotesRespond)); + SVotesRespond *pVotesRespond = taosMemoryCalloc(1, sizeof(SVotesRespond)); + if (pVotesRespond == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pVotesRespond->replicas = &(pSyncNode->replicasId); pVotesRespond->replicaNum = pSyncNode->replicaNum; @@ -185,62 +154,15 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) { - // ASSERT(pVotesRespond->isRespond[i] == false); pVotesRespond->isRespond[i] = true; return; } } - ASSERT(0); + + sNFatal(pVotesRespond->pSyncNode, "votes respond not found"); } void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) { pVotesRespond->term = term; memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond)); - /* - for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { - pVotesRespond->isRespond[i] = false; - } - */ -} - -cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { - char u64buf[128] = {0}; - cJSON *pRoot = cJSON_CreateObject(); - - if (pVotesRespond != NULL) { - cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i])); - } - int32_t respondNum = 0; - int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum); - for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { - arr[i] = pVotesRespond->isRespond[i]; - if (pVotesRespond->isRespond[i]) { - respondNum++; - } - } - cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond); - cJSON_AddNumberToObject(pRoot, "respondNum", respondNum); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot); - return pJson; -} - -char *votesRespond2Str(SVotesRespond *pVotesRespond) { - cJSON *pJson = votesRespond2Json(pVotesRespond); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; } diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 3a49074272..241a4a5e02 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -82,6 +82,18 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore); cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); +cJSON* voteGranted2Json(SVotesGranted* pVotesGranted); +char* voteGranted2Str(SVotesGranted* pVotesGranted); +cJSON* votesRespond2Json(SVotesRespond* pVotesRespond); +char* votesRespond2Str(SVotesRespond* pVotesRespond); + +cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); +char* syncUtilRaftId2Str(const SRaftId* p); + +cJSON* snapshotSender2Json(SSyncSnapshotSender* pSender); +char* snapshotSender2Str(SSyncSnapshotSender* pSender); +cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver); +char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver); #ifdef __cplusplus } diff --git a/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c new file mode 100644 index 0000000000..fdaf4b8b34 --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncSnapshotDebug.c @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" + +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + if (pSender != NULL) { + cJSON_AddNumberToObject(pRoot, "start", pSender->start); + cJSON_AddNumberToObject(pRoot, "seq", pSender->seq); + cJSON_AddNumberToObject(pRoot, "ack", pSender->ack); + + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader); + cJSON_AddStringToObject(pRoot, "pReader", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); + cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen); + + if (pSender->pCurrentBlock != NULL) { + char *s; + s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); + taosMemoryFree(s); + } + + cJSON *pSnapshot = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex); + cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm); + cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf); + cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS); + cJSON_AddStringToObject(pRoot, "sendingMS", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm); + // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot); + return pJson; +} + +char *snapshotSender2Str(SSyncSnapshotSender *pSender) { + cJSON *pJson = snapshotSender2Json(pSender); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + if (pReceiver != NULL) { + cJSON_AddNumberToObject(pRoot, "start", pReceiver->start); + cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack); + + snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter); + cJSON_AddStringToObject(pRoot, "pWriter", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + + cJSON *pFromId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr); + cJSON_AddStringToObject(pFromId, "addr", u64buf); + { + uint64_t u64 = pReceiver->fromId.addr; + cJSON *pTmp = pFromId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId); + cJSON_AddItemToObject(pRoot, "fromId", pFromId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex); + cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm); + cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex); + cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot); + return pJson; +} + +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { + cJSON *pJson = snapshotReceiver2Json(pReceiver); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} diff --git a/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c b/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c new file mode 100644 index 0000000000..b12f4e76a5 --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncUtilDebug.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" + +cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn); + cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); + return pJson; +} + +char* syncUtilRaftId2Str(const SRaftId* p) { + cJSON* pJson = syncUtilRaftId2Json(p); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} diff --git a/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c b/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c new file mode 100644 index 0000000000..f1c26a97aa --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncVoteMgrDebug.c @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" + +cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { + char u64buf[128] = {0}; + cJSON *pRoot = cJSON_CreateObject(); + + if (pVotesGranted != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i])); + } + int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum); + for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) { + arr[i] = pVotesGranted->isGranted[i]; + } + cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted); + + cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); + cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); + snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + + bool majority = voteGrantedMajority(pVotesGranted); + cJSON_AddNumberToObject(pRoot, "majority", majority); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot); + return pJson; +} + +char *voteGranted2Str(SVotesGranted *pVotesGranted) { + cJSON *pJson = voteGranted2Json(pVotesGranted); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { + char u64buf[128] = {0}; + cJSON *pRoot = cJSON_CreateObject(); + + if (pVotesRespond != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i])); + } + int32_t respondNum = 0; + int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum); + for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) { + arr[i] = pVotesRespond->isRespond[i]; + if (pVotesRespond->isRespond[i]) { + respondNum++; + } + } + cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond); + cJSON_AddNumberToObject(pRoot, "respondNum", respondNum); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot); + return pJson; +} + +char *votesRespond2Str(SVotesRespond *pVotesRespond) { + cJSON *pJson = votesRespond2Json(pVotesRespond); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} -- GitLab