提交 f68e41a4 编写于 作者: B Benguang Zhao

enh: initialize log buffer again after receiving a complete snapshot

上级 006e13e6
...@@ -89,14 +89,15 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn ...@@ -89,14 +89,15 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
// SSyncLogBuffer // SSyncLogBuffer
SSyncLogBuffer* syncLogBufferCreate(); SSyncLogBuffer* syncLogBufferCreate();
void syncLogBufferDestroy(SSyncLogBuffer* pBuf); void syncLogBufferDestroy(SSyncLogBuffer* pBuf);
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access // access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
......
...@@ -704,6 +704,28 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { ...@@ -704,6 +704,28 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
return ret; return ret;
} }
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
ASSERT(pNode->pLogStore != NULL && "log store not created");
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
return -1;
}
SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
if (lastVer < commitIndex || firstVer > commitIndex + 1) {
if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) {
sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64,
pNode->vgId, terrstr(), lastVer, commitIndex);
return -1;
}
}
return 0;
}
// open/close -------------- // open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
...@@ -912,6 +934,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -912,6 +934,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
} }
pSyncNode->commitIndex = commitIndex; pSyncNode->commitIndex = commitIndex;
if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
goto _error;
}
// timer ms init // timer ms init
pSyncNode->pingBaseLine = PING_TIMER_MS; pSyncNode->pingBaseLine = PING_TIMER_MS;
pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN;
......
...@@ -139,8 +139,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex ...@@ -139,8 +139,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
return 0; return 0;
} }
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
ASSERT(pNode->pLogStore != NULL && "log store not created"); ASSERT(pNode->pLogStore != NULL && "log store not created");
ASSERT(pNode->pFsm != NULL && "pFsm not registered"); ASSERT(pNode->pFsm != NULL && "pFsm not registered");
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
...@@ -226,14 +225,37 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -226,14 +225,37 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// validate // validate
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
_err: _err:
taosThreadMutexUnlock(&pBuf->mutex);
return -1; return -1;
} }
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
}
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) continue;
syncEntryDestroy(pEntry);
pEntry = NULL;
memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
}
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
if (ret < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
}
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
}
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
SyncIndex index = pBuf->matchIndex; SyncIndex index = pBuf->matchIndex;
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
...@@ -628,7 +650,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -628,7 +650,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
// check existence of WAl log // check existence of WAl log
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
if (pMsg->matchIndex < firstVer) { if (pMsg->matchIndex + 1 < firstVer) {
if (syncNodeStartSnapshot(pNode, &destId) < 0) { if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr); sError("vgId:%d, failed to start snapshot for dest: 0x%016" PRIx64, pNode->vgId, destId.addr);
} }
...@@ -929,7 +951,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn ...@@ -929,7 +951,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
if (pMgr) { if (pMgr) {
sInfo("vgId:%d, reset log repl mgr for dest: 0x%016" PRIx64, pNode->vgId, pDestId->addr); sInfo("vgId:%d, reset sync log repl mgr for peer: 0x%016" PRIx64 " since %s. index: %" PRId64, pNode->vgId,
pDestId->addr, terrstr(), index);
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplMgrReset(pMgr);
} }
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncPipeline.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
...@@ -273,6 +274,11 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { ...@@ -273,6 +274,11 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 1; return 1;
} }
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port);
code = snapshotSenderStart(pSender); code = snapshotSenderStart(pSender);
if (code != 0) { if (code != 0) {
sNError(pSyncNode, "snapshot sender start error"); sNError(pSyncNode, "snapshot sender start error");
...@@ -372,7 +378,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh ...@@ -372,7 +378,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
} }
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) { int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
ASSERT(!snapshotReceiverIsStart(pReceiver)); if (snapshotReceiverIsStart(pReceiver)) {
sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId);
return 0;
}
pReceiver->start = true; pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
...@@ -738,6 +747,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { ...@@ -738,6 +747,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncNodeOnSnapshotEnd(pSyncNode, pMsg); syncNodeOnSnapshotEnd(pSyncNode, pMsg);
(void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response // force close, no response
......
...@@ -70,7 +70,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { ...@@ -70,7 +70,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
taosArrayClear(pWal->fileInfoSet); taosArrayClear(pWal->fileInfoSet);
pWal->vers.firstVer = -1; pWal->vers.firstVer = ver + 1;
pWal->vers.lastVer = ver; pWal->vers.lastVer = ver;
pWal->vers.commitVer = ver; pWal->vers.commitVer = ver;
pWal->vers.snapshotVer = ver; pWal->vers.snapshotVer = ver;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册