diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7cd2ebdede53b6a62f273c75fd7507c86e7a1567..952066df461b22b36023da2011449f03b7f9f478 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,12 +26,14 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 10 -#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 2 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 +#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) +#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index cd2c2d4a4f8b708246786fa1ea0cf030d5799593..6c95c3c6d72929045bd780056811c1938864717b 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply { SyncTerm privateTerm; bool success; SyncIndex matchIndex; + int64_t startTime; } SyncAppendEntriesReply; SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 1f60a9d57e75c86bce5cd57d5a04540681c9df3c..fb85b89419a8b09a4e60d174d51791acf5a5b6ae 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -29,8 +29,12 @@ extern "C" { // SIndexMgr ----------------------------- typedef struct SSyncIndexMgr { SRaftId (*replicas)[TSDB_MAX_REPLICA]; - SyncIndex index[TSDB_MAX_REPLICA]; - SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + SyncIndex index[TSDB_MAX_REPLICA]; + SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function + + int64_t startTimeArr[TSDB_MAX_REPLICA]; + int64_t recvTimeArr[TSDB_MAX_REPLICA]; + int32_t replicaNum; SSyncNode *pSyncNode; } SSyncIndexMgr; @@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); + +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); // void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); // SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3e247e5d795f99a11013fa045f1a0585cc6dc168..de43c816546b76a72c107668519d4c58866d5f38 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); + // trace log void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4f93d8197dc801ae86619858b15d9055059565eb..e000ba8bf869454a0276411d7312190be98348b9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->matchIndex = pMsg->prevLogIndex; } + pReply->startTime = ths->startTime; + // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = ths->commitIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = matchIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); @@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->success = true; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; + pReply->startTime = ths->startTime; // msg event log syncLogSendAppendEntriesReply(ths, pReply, ""); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4928c54bd72acc079f2d4e4c42aac49d1ce33e82..9253ed0129751c2e0d0538602dc124b38a72447d 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + // update time + syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs()); + SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3a94ed9713ba2b12e2ce766b3dfd9e615b309d9f..3829ea07309510c2147df7048ab5106d4af0c916 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { return false; } +static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { + ASSERT(a >= 0); + ASSERT(b >= 0); + + int64_t c = a > b ? a - b : b - a; + return c; +} + +int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { + int32_t quorum = 1; // self + + int64_t timeNow = taosGetTimestampMs(); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + + int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); + int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + + int32_t addQuorum = 0; + + if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { + addQuorum = 1; + } else { + addQuorum = 0; + } + + if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 0; + } + + quorum += addQuorum; + } + + ASSERT(quorum <= pSyncNode->replicaNum); + + if (quorum < pSyncNode->quorum) { + quorum = pSyncNode->quorum; + } + + return quorum; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) { + return true; + } + } + return false; +} + +/* bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { int agreeCount = 0; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } return false; } +*/ diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8c820fcd9cabf951bd92a0af7fcc4940faa20ccc..07c4fa8429dc539609d3ae788caab3352b0a3e60 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index)); memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); + + // int64_t timeNow = taosGetMonotonicMs(); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + pSyncIndexMgr->startTimeArr[i] = 0; + pSyncIndexMgr->recvTimeArr[i] = 0; + } + /* for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { pSyncIndexMgr->index[i] = 0; @@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, char host[128]; uint16_t port; syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); - sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index); + sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + index); } SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { @@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->startTimeArr)[i] = startTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + startTime); +} + +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t startTime = (pSyncIndexMgr->startTimeArr)[i]; + return startTime; + } + } + ASSERT(0); +} + +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + (pSyncIndexMgr->recvTimeArr)[i] = recvTime; + return; + } + } + + // maybe config change + // ASSERT(0); + char host[128]; + uint16_t port; + syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); + sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, + recvTime); +} + +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { + int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i]; + return recvTime; + } + } + ASSERT(0); +} + // for debug ------------------- void syncIndexMgrPrint(SSyncIndexMgr *pObj) { char *serialized = syncIndexMgr2Str(pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1991560d421461621cfdaa36c297addea2072c4f..a00b59d29218fc95f9c71461041c30a6e59ac020 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, - printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, + pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 13adaf055c1c320f56337ea90537e592fafcbe89..b42aba560fa1c26ef9426b55729c1d39cafa8a24 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pRoot, "success", pMsg->success); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp index d41e99a3cd2e7d866ffd880a2664ad9df2d21ef6..72d3fd5ef34d83d9a2db7c674abd58a989bfed3e 100644 --- a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() { pMsg->matchIndex = 77; pMsg->term = 33; pMsg->privateTerm = 44; + pMsg->startTime = taosGetTimestampMs(); return pMsg; } @@ -89,6 +90,8 @@ void test5() { } int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest();