提交 18b78f24 编写于 作者: M Minghao Li

refactor(sync): add return code restore from snapshot

上级 4bd65060
......@@ -17,23 +17,27 @@
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
// refactor, log[0 .. n] ==> log[m .. n]
//-------------------------------
// log[m .. n]
// public function
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
// private function
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
//-------------------------------
// log[0 .. n]
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
......@@ -43,6 +47,65 @@ static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex from
static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
//-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
ASSERT(pLogStore != NULL);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
ASSERT(pLogStore->data != NULL);
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
pLogStore->truncate = logStoreTruncate;
pLogStore->getLastIndex = logStoreLastIndex;
pLogStore->getLastTerm = logStoreLastTerm;
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
pLogStore->syncLogBeginIndex = raftLogBeginIndex;
pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
pLogStore->syncLogGetEntry = raftLogGetEntry;
pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
return pLogStore;
}
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data;
taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
pData->pWalHandle = NULL;
}
taosThreadMutexUnlock(&(pData->mutex));
taosThreadMutexDestroy(&(pData->mutex));
taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore);
}
}
//-------------------------------
// log[m .. n]
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0);
......@@ -61,7 +124,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
return -1;
}
return 0;
......@@ -265,63 +328,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
}
//-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
ASSERT(pLogStore != NULL);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
ASSERT(pLogStore->data != NULL);
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
pLogStore->truncate = logStoreTruncate;
pLogStore->getLastIndex = logStoreLastIndex;
pLogStore->getLastTerm = logStoreLastTerm;
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
pLogStore->syncLogBeginIndex = raftLogBeginIndex;
pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
pLogStore->syncLogGetEntry = raftLogGetEntry;
pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
return pLogStore;
}
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data;
taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
pData->pWalHandle = NULL;
}
taosThreadMutexUnlock(&(pData->mutex));
taosThreadMutexDestroy(&(pData->mutex));
taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore);
}
}
//-------------------------------
// log[0 .. n]
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......
......@@ -508,32 +508,51 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} while (0);
}
static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
int32_t code = 0;
if (pReceiver->pWriter != NULL) {
int32_t code = 0;
// write data
if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
ASSERT(code == 0);
if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot write error");
return -1;
}
}
// reset wal
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "wal restore from snapshot error");
return -1;
}
// update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// stop writer
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
ASSERT(code == 0);
if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
ASSERT(0);
return -1;
}
pReceiver->pWriter = NULL;
}
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
} else {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
return -1;
}
// reset wal
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
// event log
do {
SSnapshot snapshot;
......@@ -542,6 +561,8 @@ static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapsho
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return 0;
}
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
......@@ -642,6 +663,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false;
int32_t code = 0;
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
......@@ -653,8 +675,10 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
snapshotReceiverFinish(pReceiver, pMsg);
snapshotReceiverStop(pReceiver);
code = snapshotReceiverFinish(pReceiver, pMsg);
if (code == 0) {
snapshotReceiverStop(pReceiver);
}
needRsp = true;
// maybe update lastconfig
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册