提交 d72d2d69 编写于 作者: M Minghao Li

refactor(sync): SyncAppendEntries

上级 39d0b652
...@@ -144,6 +144,7 @@ typedef struct SSyncLogStore { ...@@ -144,6 +144,7 @@ typedef struct SSyncLogStore {
// refactor, log[0 .. n] ==> log[m .. n] // refactor, log[0 .. n] ==> log[m .. n]
int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
......
...@@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore); ...@@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore); void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson); int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
cJSON * raftStore2Json(SRaftStore *pRaftStore); cJSON *raftStore2Json(SRaftStore *pRaftStore);
char * raftStore2Str(SRaftStore *pRaftStore); char *raftStore2Str(SRaftStore *pRaftStore);
// for debug ------------------- // for debug -------------------
void raftStorePrint(SRaftStore *pObj); void raftStorePrint(SRaftStore *pObj);
......
...@@ -469,6 +469,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -469,6 +469,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sInfo("syncNodeMakeLogSame, from %ld to %ld", delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
} }
...@@ -495,19 +497,30 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -495,19 +497,30 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// prevLogIndex == -1 // prevLogIndex == -1
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex);
return true; return true;
} }
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) { if (pMsg->prevLogIndex > myLastIndex) {
sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex,
myLastIndex);
return false; return false;
} }
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
sTrace(
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
return true; return true;
} }
sTrace(
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
return false; return false;
} }
...@@ -517,7 +530,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -517,7 +530,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// print log // print log
char logBuf[128] = {0}; char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg); syncAppendEntriesLog2(logBuf, pMsg);
// if I am standby, to be added into a raft group, I should process SyncAppendEntries msg // if I am standby, to be added into a raft group, I should process SyncAppendEntries msg
...@@ -551,6 +565,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -551,6 +565,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do { do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) { if (condition) {
sTrace("recv SyncAppendEntries, candidate to follower");
syncNodeBecomeFollower(ths); syncNodeBecomeFollower(ths);
// do not reply? // do not reply?
return ret; return ret;
...@@ -586,6 +602,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -586,6 +602,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2 || condition3; bool condition = condition1 || condition2 || condition3;
if (condition) { if (condition) {
sTrace(
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
"condition1:%d, condition2:%d, condition3:%d",
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
...@@ -623,6 +645,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -623,6 +645,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2; bool condition = condition1 || condition2;
if (condition) { if (condition) {
sTrace(
"recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, "
"condition2:%d, logOK:%d",
ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore),
condition1, condition2, logOK);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
...@@ -660,6 +688,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -660,6 +688,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg // has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex,
hasExtraEntries, hasAppendEntries);
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
code = syncNodeMakeLogSame(ths, pMsg); code = syncNodeMakeLogSame(ths, pMsg);
......
...@@ -56,6 +56,8 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b ...@@ -56,6 +56,8 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b
return 0; return 0;
} }
int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; }
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册