未验证 提交 0383ff3d 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #16206 from taosdata/feature/3.0_mhli

refactor(sync): adjust strategy for dynamic quorum
......@@ -34,6 +34,7 @@ extern bool gRaftDetailLog;
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
......@@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
......@@ -146,23 +146,47 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 1;
} else {
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
addQuorum = 1;
} else {
addQuorum = 0;
} else {
addQuorum = 0;
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
} else {
addQuorum = 0;
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
quorum += addQuorum;
......@@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
......@@ -2773,11 +2773,27 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
return 0;
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
if (pEntry->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term);
syncNodeEventLog(ths, logBuf);
return 0;
if (pEntry->index < syncNodeGetLastIndex(ths)) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index);
syncNodeEventLog(ths, logBuf);
return 0;
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
return 0;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册