diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f3c771518a658e80289cb118d2063e1f5e072987..233d68c4cb06352a2fdd98716a122705b1d1c5f8 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -35,8 +35,10 @@ extern bool gRaftDetailLog; #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 +#define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_ADD_QUORUM_COUNT 3 -#define SYNC_MNODE_MAX_LOG_NUM 10000 +#define SYNC_MNODE_LOG_RETENTION 10000 +#define SYNC_VNODE_LOG_RETENTION 500 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 07b2da1632109aca1d4f88657385626fa419d222..bc7d99ba9dd4dc726c626736dc179af5f7609078 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -459,6 +459,8 @@ typedef struct SyncHeartbeat { SyncTerm term; SyncIndex commitIndex; SyncTerm privateTerm; + SyncTerm minMatchIndex; + } SyncHeartbeat; SyncHeartbeat* syncHeartbeatBuild(int32_t vgId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 19069dc1ce64dc9348e9e28e8d7b11a7f1b8f5ec..4066275ed44a50de1ef26e41ad5a586dc729001c 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -197,6 +197,8 @@ typedef struct SSyncNode { bool changing; int64_t snapshottingIndex; + int64_t snapshottingTime; + int64_t minMatchIndex; int64_t startTime; int64_t leaderTime; @@ -236,15 +238,16 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); // utils -------------- -int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); -int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); -cJSON* syncNode2Json(const SSyncNode* pSyncNode); -char* syncNode2Str(const SSyncNode* pSyncNode); -void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); -void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str); -char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); -void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +cJSON* syncNode2Json(const SSyncNode* pSyncNode); +char* syncNode2Str(const SSyncNode* pSyncNode); +void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); +void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str); +char* syncNode2SimpleStr(const SSyncNode* pSyncNode); +bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); +void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); +SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 32b6424ac2ee78fd50c5e8541b30e158150ca613..bb93248b5194391bdbd6471c6c6eabc793265303 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -288,6 +288,22 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { return ret; } +SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { + SyncIndex minMatchIndex = SYNC_INDEX_INVALID; + + if (pSyncNode->peersNum > 0) { + minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0])); + } + + for (int32_t i = 1; i < pSyncNode->peersNum; ++i) { + SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); + if (matchIndex < minMatchIndex) { + minMatchIndex = matchIndex; + } + } + return minMatchIndex; +} + int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -298,58 +314,118 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { int32_t code = 0; if (syncNodeIsMnode(pSyncNode)) { + // mnode + int64_t logRetention = SYNC_MNODE_LOG_RETENTION; + SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); int64_t logNum = endIndex - beginIndex; - if (logNum < SYNC_MNODE_MAX_LOG_NUM) { + bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore); + + if (isEmpty || (!isEmpty && logNum < logRetention)) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "mnode log num:%ld, do not delete", logNum); + snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld, log-num:%ld, empty:%d, do not delete wal", + lastApplyIndex, logNum, isEmpty); syncNodeEventLog(pSyncNode, logBuf); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; } - } - for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { - int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); - if (lastApplyIndex > matchIndex) { - do { - char host[64]; - uint16_t port; - syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port); + goto _DEL_WAL; + + } else { + // vnode + if (pSyncNode->replicaNum > 1) { + // multi replicas + + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); + + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); + if (lastApplyIndex > matchIndex) { + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal", + lastApplyIndex, matchIndex, host, port); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; + } + } + + } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { + if (lastApplyIndex > pSyncNode->minMatchIndex) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "new-snapshot-index:%ld is greater than min-match-index:%ld, do not delete wal", lastApplyIndex, + pSyncNode->minMatchIndex); + syncNodeEventLog(pSyncNode, logBuf); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; + } + + } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), - "new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal", lastApplyIndex, - matchIndex, host, port); + snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld candidate, do not delete wal", lastApplyIndex, + pSyncNode->minMatchIndex); syncNodeEventLog(pSyncNode, logBuf); - } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return 0; + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; + + } else { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld unknown state, do not delete wal", lastApplyIndex, + pSyncNode->minMatchIndex); + syncNodeEventLog(pSyncNode, logBuf); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; + } + + goto _DEL_WAL; + + } else { + // one replica + + goto _DEL_WAL; } } - if (pSyncNode->replicaNum == 1) { - SSyncLogStoreData* pData = pSyncNode->pLogStore->data; - code = walBeginSnapshot(pData->pWal, lastApplyIndex); - } else { - // calculate snapshot index +_DEL_WAL: + do { SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex); if (snapshottingIndex == SYNC_INDEX_INVALID) { atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex); + pSyncNode->snapshottingTime = taosGetTimestampMs(); - do { + SSyncLogStoreData* pData = pSyncNode->pLogStore->data; + code = walBeginSnapshot(pData->pWal, lastApplyIndex); + if (code == 0) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "wal snapshot begin, index:%ld, last apply index:%ld", pSyncNode->snapshottingIndex, lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - } while (0); - SSyncLogStoreData* pData = pSyncNode->pLogStore->data; - code = walBeginSnapshot(pData->pWal, lastApplyIndex); + } else { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "wal snapshot begin error since:%s, index:%ld, last apply index:%ld", + terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex); + syncNodeErrorLog(pSyncNode, logBuf); + + atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + } } else { char logBuf[256]; @@ -357,7 +433,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { snapshottingIndex, lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); } - } + } while (0); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return code; @@ -373,16 +449,22 @@ int32_t syncEndSnapshot(int64_t rid) { int32_t code = 0; if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { - do { - char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&pSyncNode->snapshottingIndex)); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); + if (code != 0) { + sError("vgId:%d, wal end snapshot error since:%s", terrstr(terrno)); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return -1; + } else { + do { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&pSyncNode->snapshottingIndex)); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); - atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + } } taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -1208,6 +1290,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // peer state syncNodePeerStateInit(pSyncNode); + // min match index + pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -1745,18 +1830,18 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 + ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, - pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, - pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, + pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1769,18 +1854,18 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 + ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, - pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser, - pSyncNode->heartbeatTimerLogicClockUser, printStr); + pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, + pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } @@ -1818,17 +1903,18 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 + ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, - pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, + pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1839,17 +1925,18 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 + ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, - pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, + pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), + pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr); } else { snprintf(s, len, "%s", str); } @@ -2197,6 +2284,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm); } + // min match index + pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -2296,6 +2386,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm); } + // min match index + pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -2671,6 +2764,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = pData->pSyncNode; SSyncTimer* pSyncTimer = pData->pTimer; + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + return; + } + syncNodeEventLog(pSyncNode, "eq peer hb timer"); int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock); @@ -2683,6 +2780,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->destId = pData->destId; pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; SRpcMsg rpcMsg; @@ -2845,6 +2943,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncNodeResetElectTimer(ths); + ths->minMatchIndex = pMsg->minMatchIndex; #if 0 if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { @@ -3353,7 +3452,7 @@ const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) { void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%lu, ms:%d, data:%p}, %s", - syncTimerTypeStr(pMsg->timeoutType), s, pMsg->logicClock, pMsg->timerMS, pMsg->data); + syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); syncNodeEventLog(pSyncNode, logBuf); } @@ -3485,8 +3584,9 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, - pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + "send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); } @@ -3496,8 +3596,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, - pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index e7b712acc98e985edd67250534541d690d6d568a..431e1b46d09177cf5b625747a3460679c77dfe37 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -216,6 +216,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->destId = pSyncNode->peersId[i]; pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 87368cb45c2a411c7f24c7b5e582b5352b1f31dc..18c381a4e604c2e180bbbe771e970f731bac1e5f 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -16,6 +16,7 @@ #include "syncTimeout.h" #include "syncElection.h" #include "syncRaftCfg.h" +#include "syncRaftLog.h" #include "syncReplication.h" #include "syncRespMgr.h" @@ -63,10 +64,31 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { // timer replicate syncNodeReplicate(ths); + // clean mnode index if (syncNodeIsMnode(ths)) { syncNodeCleanConfigIndex(ths); } + // end timeout wal snapshot + int64_t timeNow = taosGetTimestampMs(); + if (timeNow - ths->snapshottingIndex > SYNC_DEL_WAL_MS && + atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) { + SSyncLogStoreData* pData = ths->pLogStore->data; + int32_t code = walEndSnapshot(pData->pWal); + if (code != 0) { + sError("vgId:%d, wal end snapshot error since:%s", terrstr(terrno)); + return -1; + } else { + do { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&ths->snapshottingIndex)); + syncNodeEventLog(ths, logBuf); + } while (0); + + atomic_store_64(&ths->snapshottingIndex, SYNC_INDEX_INVALID); + } + } + #if 0 if (!syncNodeIsMnode(ths)) { syncRespClean(ths->pSyncRespMgr);