diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 952066df461b22b36023da2011449f03b7f9f478..790cbf906da887c01bec011a875a67db8f359b7b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,6 +34,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1000 +#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index de43c816546b76a72c107668519d4c58866d5f38..0afc373f2d0d90333002245e43817d12ef02df8c 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); -SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); +SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3829ea07309510c2147df7048ab5106d4af0c916..1e68fe346c71d65e0594643da5b94e2dd1ab204d 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -146,23 +146,47 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { int64_t timeNow = taosGetTimestampMs(); for (int i = 0; i < pSyncNode->peersNum; ++i) { - int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); - int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]); - int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); - int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + int64_t recvTimeDiff = TABS(peerRecvTime - timeNow); + int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime); + int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode)); + + /* + int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); + int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode)); + */ int32_t addQuorum = 0; if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { - addQuorum = 1; + if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 1; + } else { + if (logDiff < SYNC_ADD_QUORUM_COUNT) { + addQuorum = 1; + } else { + addQuorum = 0; + } + } } else { addQuorum = 0; } - if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { - addQuorum = 0; - } + /* + if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { + addQuorum = 1; + } else { + addQuorum = 0; + } + + if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 0; + } + */ quorum += addQuorum; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a00b59d29218fc95f9c71461041c30a6e59ac020..3fe600ecbbe1da1d452677475e4e11c5ea3232dd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { // return max(logLastIndex, snapshotLastIndex) // if no snapshot and log, return -1 -SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { +SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -2773,11 +2773,27 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p return 0; } - if (ths->vgId > 1) { - syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + if (pEntry->term < ths->pRaftStore->currentTerm) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term); + syncNodeEventLog(ths, logBuf); + return 0; + } + + if (pEntry->index < syncNodeGetLastIndex(ths)) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index); + syncNodeEventLog(ths, logBuf); return 0; } + /* + if (ths->vgId > 1) { + syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + return 0; + } + */ + do { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index);