提交 8a32c0c1 编写于 作者: M Minghao Li

refactor(sync): adjust strategy for dynamic quorum

上级 73c3a1eb
......@@ -34,6 +34,7 @@ extern bool gRaftDetailLog;
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
......
......@@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
......
......@@ -146,23 +146,47 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
/*
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
*/
int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 1;
} else {
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
addQuorum = 1;
} else {
addQuorum = 0;
}
}
} else {
addQuorum = 0;
}
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
}
/*
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
} else {
addQuorum = 0;
}
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
}
*/
quorum += addQuorum;
}
......
......@@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
// return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
......@@ -2773,11 +2773,27 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
return 0;
}
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
if (pEntry->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term);
syncNodeEventLog(ths, logBuf);
return 0;
}
if (pEntry->index < syncNodeGetLastIndex(ths)) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index);
syncNodeEventLog(ths, logBuf);
return 0;
}
/*
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
return 0;
}
*/
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册