提交 c9c05761 编写于 作者: B Benguang Zhao

enh: rename syncLogToAppendEntries to syncBuildAppendEntriesFromRaftLog

上级 03b88ff4
......@@ -243,6 +243,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg);
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
......
......@@ -2313,33 +2313,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return 0;
}
int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg) {
uint32_t dataLen = pEntry->bytes;
uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
pRpcMsg->contLen = bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
if (pRpcMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->bytes = pRpcMsg->contLen;
pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
pMsg->dataLen = dataLen;
(void)memcpy(pMsg->data, pEntry, dataLen);
pMsg->prevLogIndex = pEntry->index - 1;
pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->pRaftStore->currentTerm;
pMsg->commitIndex = pNode->commitIndex;
pMsg->privateTerm = 0;
return 0;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode) {
......@@ -152,6 +153,34 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg) {
uint32_t dataLen = pEntry->bytes;
uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
pRpcMsg->contLen = bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
if (pRpcMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->bytes = pRpcMsg->contLen;
pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
pMsg->dataLen = dataLen;
(void)memcpy(pMsg->data, pEntry, dataLen);
pMsg->prevLogIndex = pEntry->index - 1;
pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->pRaftStore->currentTerm;
pMsg->commitIndex = pNode->commitIndex;
pMsg->privateTerm = 0;
return 0;
}
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncHeartbeat);
pMsg->pCont = rpcMallocCont(bytes);
......
......@@ -911,7 +911,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
}
if (pTerm) *pTerm = pEntry->term;
int32_t code = syncLogToAppendEntries(pNode, pEntry, prevLogTerm, &msgOut);
int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut);
if (code < 0) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
goto _err;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册