未验证 提交 94b648eb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18331 from taosdata/feature/sync-del-wal

refactor(sync): del wal in multi-replicas
...@@ -36,7 +36,7 @@ extern "C" { ...@@ -36,7 +36,7 @@ extern "C" {
#define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 100 #define SYNC_VNODE_LOG_RETENTION 20
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30 #define SNAPSHOT_WAIT_MS 1000 * 30
......
...@@ -67,6 +67,9 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -67,6 +67,9 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->matchIndex > oldMatchIndex) { if (pMsg->matchIndex > oldMatchIndex) {
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
syncMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
// maybe update minMatchIndex
ths->minMatchIndex = syncMinMatchIndex(ths);
} }
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
......
...@@ -243,6 +243,18 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ...@@ -243,6 +243,18 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
goto _DEL_WAL; goto _DEL_WAL;
} else { } else {
lastApplyIndex -= SYNC_VNODE_LOG_RETENTION;
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
syncNodeRelease(pSyncNode);
return 0;
}
// vnode // vnode
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
// multi replicas // multi replicas
...@@ -300,26 +312,31 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ...@@ -300,26 +312,31 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
_DEL_WAL: _DEL_WAL:
do { do {
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex); SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
SyncIndex snapshotVer = walGetSnapshotVer(pData->pWal);
if (snapshottingIndex == SYNC_INDEX_INVALID) { SyncIndex walCommitVer = walGetCommittedVer(pData->pWal);
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex); SyncIndex wallastVer = walGetLastVer(pData->pWal);
pSyncNode->snapshottingTime = taosGetTimestampMs(); if (lastApplyIndex <= walCommitVer) {
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
if (snapshottingIndex == SYNC_INDEX_INVALID) {
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
pSyncNode->snapshottingTime = taosGetTimestampMs();
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
if (code == 0) {
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
pSyncNode->snapshottingIndex, lastApplyIndex);
} else {
sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64,
terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
}
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
if (code == 0) {
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
pSyncNode->snapshottingIndex, lastApplyIndex);
} else { } else {
sNError(pSyncNode, "wal snapshot begin error since:%s, index:%" PRId64 ", last apply index:%" PRId64, sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex); snapshottingIndex, lastApplyIndex);
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
} }
} else {
sNTrace(pSyncNode, "snapshotting for %" PRId64 ", do not delete wal for new-snapshot-index:%" PRId64,
snapshottingIndex, lastApplyIndex);
} }
} while (0); } while (0);
......
...@@ -375,7 +375,17 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp ...@@ -375,7 +375,17 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
// ASSERT(walCommit(pWal, index) == 0);
// need not update
SyncIndex snapshotVer = walGetSnapshotVer(pWal);
SyncIndex walCommitVer = walGetCommittedVer(pWal);
SyncIndex wallastVer = walGetLastVer(pWal);
if (index < snapshotVer || index > wallastVer) {
// ignore
return 0;
}
int32_t code = walCommit(pWal, index); int32_t code = walCommit(pWal, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
......
...@@ -62,18 +62,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { ...@@ -62,18 +62,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeCleanConfigIndex(ths); syncNodeCleanConfigIndex(ths);
} }
// end timeout wal snapshot
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
if (timeNow - ths->snapshottingIndex > SYNC_DEL_WAL_MS && if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) { // end timeout wal snapshot
SSyncLogStoreData* pData = ths->pLogStore->data; if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS &&
int32_t code = walEndSnapshot(pData->pWal); atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
if (code != 0) { SSyncLogStoreData* pData = ths->pLogStore->data;
sNError(ths, "timer wal snapshot end error since:%s", terrstr()); int32_t code = walEndSnapshot(pData->pWal);
return -1; if (code != 0) {
} else { sNError(ths, "timer wal snapshot end error since:%s", terrstr());
sNTrace(ths, "wal snapshot end, index:%" PRId64, atomic_load_64(&ths->snapshottingIndex)); return -1;
atomic_store_64(&ths->snapshottingIndex, SYNC_INDEX_INVALID); } else {
sNTrace(ths, "wal snapshot end, index:%" PRId64, atomic_load_64(&ths->snapshottingIndex));
atomic_store_64(&ths->snapshottingIndex, SYNC_INDEX_INVALID);
}
} }
} }
......
...@@ -239,11 +239,11 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -239,11 +239,11 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
"vgId:%d, sync %s " "vgId:%d, sync %s "
"%s" "%s"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
} }
......
...@@ -325,7 +325,8 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -325,7 +325,8 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (pInfo) { if (pInfo) {
if (ver >= pInfo->lastVer) { if (ver >= pInfo->lastVer) {
pInfo--; //pInfo--;
pInfo++;
} }
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, wal end remove for %" PRId64, pWal->cfg.vgId, pInfo->firstVer); wDebug("vgId:%d, wal end remove for %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册