提交 5387e4ff 编写于 作者: M Minghao Li

sync refactor

上级 b08cdf2f
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
// TLA+ Spec // TLA+ Spec
...@@ -50,33 +51,54 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -50,33 +51,54 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]); SRaftId* pDestId = &(pSyncNode->peersId[i]);
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
// set prevLogIndex
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex = nextIndex - 1; SyncIndex preLogIndex = nextIndex - 1;
// set preLogTerm
SyncTerm preLogTerm = 0; SyncTerm preLogTerm = 0;
if (preLogIndex >= SYNC_INDEX_BEGIN) { if (preLogIndex >= SYNC_INDEX_BEGIN) {
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
assert(pPreEntry != NULL);
preLogTerm = pPreEntry->term; preLogTerm = pPreEntry->term;
syncEntryDestory(pPreEntry);
} }
SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); // batch optimized
assert(nextIndex == lastIndex); // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) {
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); // add pEntry into msg
assert(pEntry != NULL); uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(len == pEntry->bytes);
memcpy(pMsg->data, serialized, len);
free(serialized);
syncEntryDestory(pEntry);
} else {
// maybe overflow, send empty record
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0);
}
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->prevLogIndex = preLogIndex; pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm; pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex; pMsg->commitIndex = pSyncNode->commitIndex;
pMsg->dataLen = pEntry->bytes;
// add pEntry into msg
// send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg); syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
syncAppendEntriesDestroy(pMsg);
} }
return ret; return ret;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册