From 2ca5bdc708ccbeb497912b01dd4b05b09f69d4dc Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 17 Aug 2022 15:39:38 +0800 Subject: [PATCH] refactor(sync): add syncNodeDynamicQuorum --- include/libs/sync/sync.h | 14 ++-- include/libs/sync/syncTools.h | 1 + source/libs/sync/inc/syncIndexMgr.h | 17 +++-- source/libs/sync/inc/syncInt.h | 2 + source/libs/sync/src/syncAppendEntries.c | 9 +++ source/libs/sync/src/syncAppendEntriesReply.c | 12 ++++ source/libs/sync/src/syncCommit.c | 58 ++++++++++++++++ source/libs/sync/src/syncIndexMgr.c | 66 ++++++++++++++++++- source/libs/sync/src/syncMain.c | 11 ++-- source/libs/sync/src/syncMessage.c | 2 + .../sync/test/syncAppendEntriesReplyTest.cpp | 3 + 11 files changed, 178 insertions(+), 17 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7cd2ebdede..952066df46 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,12 +26,14 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 10 -#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 2 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) +#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index cd2c2d4a4f..6c95c3c6d7 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply { SyncTerm privateTerm; bool success; SyncIndex matchIndex; + int64_t startTime; } SyncAppendEntriesReply; SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 1f60a9d57e..fb85b89419 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -29,8 +29,12 @@ extern "C" { // SIndexMgr ----------------------------- typedef struct SSyncIndexMgr { SRaftId (*replicas)[TSDB_MAX_REPLICA]; - SyncIndex index[TSDB_MAX_REPLICA]; - SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + SyncIndex index[TSDB_MAX_REPLICA]; + SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + + int64_t startTimeArr[TSDB_MAX_REPLICA]; + int64_t recvTimeArr[TSDB_MAX_REPLICA]; + int32_t replicaNum; SSyncNode *pSyncNode; } SSyncIndexMgr; @@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); + +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); // void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); // SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3e247e5d79..de43c81654 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); + // trace log void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4f93d8197d..e000ba8bf8 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->matchIndex = pMsg->prevLogIndex; } + pReply->startTime = ths->startTime; + // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = ths->commitIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4928c54bd7..9253ed0129 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3a94ed9713..3829ea0730 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { return false; } +static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { + ASSERT(a >= 0); + ASSERT(b >= 0); + + int64_t c = a > b ? a - b : b - a; + return c; +} + +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { + int32_t quorum = 1; // self + + int64_t timeNow = taosGetTimestampMs(); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + + int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); + int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + + int32_t addQuorum = 0; + + if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { + addQuorum = 1; + } else { + addQuorum = 0; + } + + if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 0; + } + + quorum += addQuorum; + } + + ASSERT(quorum <= pSyncNode->replicaNum); + + if (quorum < pSyncNode->quorum) { + quorum = pSyncNode->quorum; + } + + return quorum; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) { + return true; + } + } + return false; +} + +/* bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { int agreeCount = 0; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } return false; } +*/ diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8c820fcd9c..07c4fa8429 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index)); memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); + + // int64_t timeNow = taosGetMonotonicMs(); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + pSyncIndexMgr->startTimeArr[i] = 0; + pSyncIndexMgr->recvTimeArr[i] = 0; + } + /* for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { pSyncIndexMgr->index[i] = 0; @@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, char host[128]; uint16_t port; syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); - sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index); + sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + index); } SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { @@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->startTimeArr)[i] = startTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + startTime); +} + +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t startTime = (pSyncIndexMgr->startTimeArr)[i]; + return startTime; + } + } + ASSERT(0); +} + +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->recvTimeArr)[i] = recvTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + recvTime); +} + +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i]; + return recvTime; + } + } + ASSERT(0); +} + // for debug ------------------- void syncIndexMgrPrint(SSyncIndexMgr *pObj) { char *serialized = syncIndexMgr2Str(pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1991560d42..a00b59d292 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, - printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 13adaf055c..b42aba560f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp index d41e99a3cd..72d3fd5ef3 100644 --- a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() { pMsg->matchIndex = 77; pMsg->term = 33; pMsg->privateTerm = 44; + pMsg->startTime = taosGetTimestampMs(); return pMsg; } @@ -89,6 +90,8 @@ void test5() { } int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); -- GitLab