From 0fb64add907ba38211d5ecdbbd3e508c88008935 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 21 Jun 2022 15:09:23 +0800 Subject: [PATCH] refactor(sync) add trace log --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/inc/syncRaftCfg.h | 12 ++--- source/libs/sync/inc/syncSnapshot.h | 18 ++++---- source/libs/sync/src/syncAppendEntriesReply.c | 45 +++++++++++++------ source/libs/sync/src/syncIndexMgr.c | 4 +- source/libs/sync/src/syncMain.c | 23 ++++++++-- source/libs/sync/src/syncRaftCfg.c | 15 ++++--- source/libs/sync/src/syncSnapshot.c | 30 ++++++------- .../libs/sync/test/syncReconfigFinishTest.cpp | 28 ++++++------ 9 files changed, 106 insertions(+), 70 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 63db395425..999147eda4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -253,6 +253,7 @@ void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodeLog(SSyncNode* pObj); void syncNodeLog2(char* s, SSyncNode* pObj); +void syncNodeLog3(char* s, SSyncNode* pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index 7f45276e9f..efe8a65b77 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -27,7 +27,7 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -#define CONFIG_FILE_LEN 1024 +#define CONFIG_FILE_LEN 2048 #define MAX_CONFIG_INDEX_COUNT 512 @@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); -char * syncCfg2Str(SSyncCfg *pSyncCfg); -char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); +char *syncCfg2Str(SSyncCfg *pSyncCfg); +char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); -char * raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); +char *raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 069154fb93..f00dfe6f50 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; SSyncCfg lastConfig; @@ -55,19 +55,19 @@ typedef struct SSyncSnapshotSender { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); -void snapshotSenderStart(SSyncSnapshotSender *pSender); +void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshot snapshot; @@ -83,8 +83,8 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateT bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 47fd23baae..7b342cdcff 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -173,21 +173,44 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); - bool hasSnapshot = syncNodeHasSnapshot(ths); - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - // start sending snapshot first time - // start here, stop by receiver - if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && - pMsg->privateTerm < pSender->privateTerm) { - snapshotSenderStart(pSender); + SSnapshot snapshot; + void* pReader = NULL; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); + if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && + !snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { + // has snapshot + ASSERT(pReader != NULL); + snapshotSenderStart(pSender, snapshot, pReader); char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); syncNodeEventLog(ths, eventLog); taosMemoryFree(eventLog); + + } else { + // no snapshot + if (pReader != NULL) { + ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); + } } + /* + bool hasSnapshot = syncNodeHasSnapshot(ths); + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + + // start sending snapshot first time + // start here, stop by receiver + if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && + pMsg->privateTerm < pSender->privateTerm) { + snapshotSenderStart(pSender); + + char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); + syncNodeEventLog(ths, eventLog); + taosMemoryFree(eventLog); + } + */ + SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; // update nextIndex to sentryIndex @@ -207,12 +230,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); - if (gRaftDetailLog) { - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", - snapshot.lastApplyIndex, snapshot.lastApplyTerm); - } return ret; } \ No newline at end of file diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 02411cfdbb..2827fcc12f 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -78,7 +78,9 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf return idx; } } - assert(0); + + syncNodeLog3("syncIndexMgrGetIndex", pSyncIndexMgr->pSyncNode); + ASSERT(0); } cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0b427eab24..95d0df6bc4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1282,6 +1282,9 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // snapshot receivers cJSON* pReceivers = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver)); + + // changing + cJSON_AddNumberToObject(pRoot, "changing", pSyncNode->changing); } cJSON* pJson = cJSON_CreateObject(); @@ -1400,7 +1403,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde pSyncNode->pRaftCfg->isStandBy = 1; // set standby } - // persist last config index + // add last config index raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex); if (IamInNew) { @@ -1827,7 +1830,11 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(index >= SYNC_INDEX_BEGIN); SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); - ASSERT(index <= syncStartIndex); + + if (index > syncStartIndex) { + syncNodeLog3("syncNodeGetPreIndex", pSyncNode); + ASSERT(0); + } SyncIndex preIndex = index - 1; return preIndex; @@ -1836,7 +1843,11 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(index >= SYNC_INDEX_BEGIN); SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); - ASSERT(index <= syncStartIndex); + + if (index > syncStartIndex) { + syncNodeLog3("syncNodeGetPreTerm", pSyncNode); + ASSERT(0); + } if (index == SYNC_INDEX_BEGIN) { return 0; @@ -1929,6 +1940,12 @@ void syncNodeLog2(char* s, SSyncNode* pObj) { } } +void syncNodeLog3(char* s, SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + sTraceLong("syncNodeLog3 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); +} + // ------ local funciton --------- // enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId) { diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 08c3e0126c..7d020f6892 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -53,7 +53,12 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { char buf[CONFIG_FILE_LEN] = {0}; memset(buf, 0, sizeof(buf)); - ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN); + + if (strlen(s) + 1 > CONFIG_FILE_LEN) { + sError("too long config str:%s", s); + ASSERT(0); + } + snprintf(buf, sizeof(buf), "%s", s); int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf)); assert(ret == sizeof(buf)); @@ -96,14 +101,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { int32_t len = 512; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -196,7 +201,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -262,7 +267,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c694a0b715..68185131d1 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -67,7 +67,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } // begin send snapshot (current term, seq begin) -void snapshotSenderStart(SSyncSnapshotSender *pSender) { +void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) { ASSERT(!snapshotSenderIsStart(pSender)); pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; @@ -75,8 +75,18 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { // open snapshot reader ASSERT(pSender->pReader == NULL); - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); - ASSERT(ret == 0); + pSender->pReader = pReader; + pSender->snapshot = snapshot; + + /* + // open snapshot reader + ASSERT(pSender->pReader == NULL); + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); + ASSERT(ret == 0); + + // get current snapshot info + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + */ if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); @@ -84,21 +94,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->blockLen = 0; - // get current snapshot info - pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); - - sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", - pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); - if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { - /* - SSyncRaftEntry *pEntry = NULL; - int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, - pSender->snapshot.lastConfigIndex, &pEntry); - ASSERT(code == 0); - ASSERT(pEntry != NULL); - */ - SSyncRaftEntry *pEntry = pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); ASSERT(pEntry != NULL); diff --git a/source/libs/sync/test/syncReconfigFinishTest.cpp b/source/libs/sync/test/syncReconfigFinishTest.cpp index 22e22bb562..2aac38bbd1 100644 --- a/source/libs/sync/test/syncReconfigFinishTest.cpp +++ b/source/libs/sync/test/syncReconfigFinishTest.cpp @@ -14,8 +14,8 @@ void logTest() { sFatal("--- sync log test: fatal"); } -SSyncCfg* createSyncOldCfg() { - SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); +SSyncCfg *createSyncOldCfg() { + SSyncCfg *pCfg = (SSyncCfg *)taosMemoryMalloc(sizeof(SSyncCfg)); memset(pCfg, 0, sizeof(SSyncCfg)); pCfg->replicaNum = 3; @@ -28,8 +28,8 @@ SSyncCfg* createSyncOldCfg() { return pCfg; } -SSyncCfg* createSyncNewCfg() { - SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); +SSyncCfg *createSyncNewCfg() { + SSyncCfg *pCfg = (SSyncCfg *)taosMemoryMalloc(sizeof(SSyncCfg)); memset(pCfg, 0, sizeof(SSyncCfg)); pCfg->replicaNum = 3; @@ -44,9 +44,9 @@ SSyncCfg* createSyncNewCfg() { SyncReconfigFinish *createMsg() { SyncReconfigFinish *pMsg = syncReconfigFinishBuild(1234); - - SSyncCfg* pOld = createSyncOldCfg(); - SSyncCfg* pNew = createSyncNewCfg(); + + SSyncCfg *pOld = createSyncOldCfg(); + SSyncCfg *pNew = createSyncNewCfg(); pMsg->oldCfg = *pOld; pMsg->newCfg = *pNew; @@ -60,18 +60,16 @@ SyncReconfigFinish *createMsg() { return pMsg; } - void test1() { SyncReconfigFinish *pMsg = createMsg(); syncReconfigFinishLog2((char *)"test1:", pMsg); syncReconfigFinishDestroy(pMsg); } - void test2() { SyncReconfigFinish *pMsg = createMsg(); - uint32_t len = pMsg->bytes; - char * serialized = (char *)taosMemoryMalloc(len); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); syncReconfigFinishSerialize(pMsg, serialized, len); SyncReconfigFinish *pMsg2 = syncReconfigFinishBuild(1000); syncReconfigFinishDeserialize(serialized, len, pMsg2); @@ -84,8 +82,8 @@ void test2() { void test3() { SyncReconfigFinish *pMsg = createMsg(); - uint32_t len; - char * serialized = syncReconfigFinishSerialize2(pMsg, &len); + uint32_t len; + char * serialized = syncReconfigFinishSerialize2(pMsg, &len); SyncReconfigFinish *pMsg2 = syncReconfigFinishDeserialize2(serialized, len); syncReconfigFinishLog2((char *)"test3: SyncReconfigFinishSerialize2 -> syncReconfigFinishDeserialize2 ", pMsg2); @@ -96,7 +94,7 @@ void test3() { void test4() { SyncReconfigFinish *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncReconfigFinish2RpcMsg(pMsg, &rpcMsg); SyncReconfigFinish *pMsg2 = (SyncReconfigFinish *)taosMemoryMalloc(rpcMsg.contLen); syncReconfigFinishFromRpcMsg(&rpcMsg, pMsg2); @@ -109,7 +107,7 @@ void test4() { void test5() { SyncReconfigFinish *pMsg = createMsg(); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg; syncReconfigFinish2RpcMsg(pMsg, &rpcMsg); SyncReconfigFinish *pMsg2 = syncReconfigFinishFromRpcMsg2(&rpcMsg); syncReconfigFinishLog2((char *)"test5: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg2 ", pMsg2); -- GitLab