提交 856806bb 编写于 作者: M Minghao Li

refactor(sync): add snapshot2 interface

上级 d891f31b
...@@ -205,7 +205,7 @@ SyncGroupId syncGetVgId(int64_t rid); ...@@ -205,7 +205,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
......
...@@ -404,8 +404,6 @@ void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, ...@@ -404,8 +404,6 @@ void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg,
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg); SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg); cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg); char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
int32_t* pRetArrSize);
// for debug ---------------------- // for debug ----------------------
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
......
...@@ -54,6 +54,7 @@ extern "C" { ...@@ -54,6 +54,7 @@ extern "C" {
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode);
int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
......
...@@ -628,8 +628,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -628,8 +628,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
#endif #endif
static int32_t syncNodeMakeLogSame2(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; }
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t code; int32_t code;
...@@ -675,6 +673,51 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -675,6 +673,51 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
return code; return code;
} }
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
int32_t code;
SyncIndex delBegin = FromIndex;
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
// invert roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
ASSERT(code == 0);
ASSERT(pRollBackEntry != NULL);
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0};
cbMeta.index = pRollBackEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %ld to %ld", delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
}
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
...@@ -694,7 +737,30 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -694,7 +737,30 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
return 0; return 0;
} }
static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) { return true; } static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
return true;
}
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
sDebug("vgId:%d sync log not ok, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sDebug("vgId:%d sync log not ok2, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
return true;
}
sDebug("vgId:%d sync log not ok3, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
// really pre log match // really pre log match
// prevLogIndex == -1 // prevLogIndex == -1
...@@ -769,7 +835,6 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -769,7 +835,6 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// operation: // operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1 // match my-commit-index or my-commit-index + 1
// no operation on log
do { do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex); (pMsg->prevLogIndex <= ths->commitIndex);
...@@ -782,14 +847,11 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -782,14 +847,11 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
SyncIndex matchIndex = ths->commitIndex; SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
int32_t retArrSize = 0;
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// make log same // make log same
do { do {
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
...@@ -797,15 +859,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -797,15 +859,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
code = syncNodeMakeLogSame2(ths, pMsg); code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0); ASSERT(code == 0);
} }
} while (0); } while (0);
// append entry batch // append entry batch
for (int32_t i = 0; i < retArrSize; ++i) { for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234); SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
return -1; return -1;
...@@ -823,7 +885,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -823,7 +885,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
walFsync(pWal, true); walFsync(pWal, true);
// update match index // update match index
matchIndex = pMsg->prevLogIndex + retArrSize; matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
} }
// prepare response msg // prepare response msg
...@@ -841,7 +903,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -841,7 +903,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply); syncAppendEntriesReplyDestroy(pReply);
return ret; return 0;
} }
} while (0); } while (0);
...@@ -887,7 +949,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -887,7 +949,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply); syncAppendEntriesReplyDestroy(pReply);
return ret; return 0;
} }
} while (0); } while (0);
...@@ -907,29 +969,26 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -907,29 +969,26 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
// has entries in SyncAppendEntries msg // has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
char logBuf[128]; do {
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d", char logBuf[128];
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
syncNodeEventLog(ths, logBuf); pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
code = syncNodeMakeLogSame2(ths, pMsg); code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0); ASSERT(code == 0);
} }
int32_t retArrSize = 0;
if (hasAppendEntries) { if (hasAppendEntries) {
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
// append entry batch // append entry batch
for (int32_t i = 0; i < retArrSize; ++i) { for (int32_t i = 0; i < pMsg->dataCount; ++i) {
// SSyncRaftEntry* pAppendEntry = syncEntryBuild4(&rpcMsgArr[i], ); SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
SSyncRaftEntry* pAppendEntry;
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
return -1; return -1;
...@@ -954,7 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -954,7 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + retArrSize : pMsg->prevLogIndex; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -994,11 +1053,11 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -994,11 +1053,11 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
ASSERT(code == 0); ASSERT(code == 0);
} }
} }
return ret; return 0;
} }
} while (0); } while (0);
return ret; return 0;
} }
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
......
...@@ -165,23 +165,22 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -165,23 +165,22 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) && if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) { ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // update next-index, match-index
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
// maybe commit // maybe commit
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
} }
} else { } else {
// start snapshot <match+1, old snapshot.end> // start snapshot <match+1, old snapshot.end>
SSnapshot snapshot; SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg); syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg);
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
} }
......
...@@ -1807,33 +1807,6 @@ char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) { ...@@ -1807,33 +1807,6 @@ char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
return serialized; return serialized;
} }
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
int32_t* pRetArrSize) {
if (pRetArrSize != NULL) {
*pRetArrSize = pSyncMsg->dataCount;
}
int32_t arrSize = pSyncMsg->dataCount;
if (arrSize > maxArrSize) {
arrSize = maxArrSize;
}
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pSyncMsg->dataCount; // <offset, contLen>
int32_t rpcArrayLen = sizeof(SRpcMsg) * pSyncMsg->dataCount; // SRpcMsg
int32_t contArrayLen = pSyncMsg->dataLen - metaArrayLen - rpcArrayLen;
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pSyncMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)(pSyncMsg->data + metaArrayLen);
void* pData = pSyncMsg->data + metaArrayLen + rpcArrayLen;
for (int i = 0; i < arrSize; ++i) {
rpcMsgArr[i] = msgArr[i];
rpcMsgArr[i].pCont = rpcMallocCont(msgArr[i].contLen);
void* pRpcCont = pSyncMsg->data + metaArr[i].offset;
memcpy(rpcMsgArr[i].pCont, pRpcCont, rpcMsgArr[i].contLen);
}
}
// for debug ---------------------- // for debug ----------------------
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) { void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg); char* serialized = syncAppendEntriesBatch2Str(pMsg);
......
...@@ -32,6 +32,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); ...@@ -32,6 +32,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
// private function // private function
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
...@@ -83,6 +84,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ...@@ -83,6 +84,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogGetEntry = raftLogGetEntry; pLogStore->syncLogGetEntry = raftLogGetEntry;
pLogStore->syncLogTruncate = raftLogTruncate; pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex; pLogStore->syncLogWriteIndex = raftLogWriteIndex;
pLogStore->syncLogExist = raftLogExist;
return pLogStore; return pLogStore;
} }
...@@ -168,6 +170,13 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { ...@@ -168,6 +170,13 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
return lastVer + 1; return lastVer + 1;
} }
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
bool b = walLogExist(pWal, index);
return b;
}
// if success, return last term // if success, return last term
// if not log, return 0 // if not log, return 0
// if error, return SYNC_TERM_INVALID // if error, return SYNC_TERM_INVALID
......
...@@ -145,27 +145,34 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -145,27 +145,34 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return -1; return -1;
} }
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE]; SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); memset(entryPArr, 0, sizeof(entryPArr));
int32_t getCount = 0; int32_t getCount = 0;
SyncIndex getEntryIndex = nextIndex;
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) { for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
if (code == 0) { if (code == 0) {
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
// get rpc msg [i] from entry entryPArr[i] = pEntry;
syncEntryDestory(pEntry);
getCount++; getCount++;
} else { } else {
break; break;
} }
} }
// SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId); SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
SyncAppendEntriesBatch* pMsg = NULL;
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry = entryPArr[i];
if (pEntry != NULL) {
syncEntryDestory(pEntry);
entryPArr[i] = NULL;
}
}
// prepare msg // prepare msg
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册