提交 944bb331 编写于 作者: M Minghao Li

sync refactor

上级 559f570e
...@@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// reject request // reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) || if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
...@@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// return to follower state // return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths); syncNodeBecomeFollower(ths);
// ret or reply? // ret or reply?
...@@ -159,6 +169,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -159,6 +169,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntryDestory(pPreEntry); syncEntryDestory(pPreEntry);
} }
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d, preMatch:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch);
if (preMatch) { if (preMatch) {
// must has preIndex in local log // must has preIndex in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
...@@ -167,7 +182,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -167,7 +182,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
if (hasExtraEntries && hasAppendEntries) { if (hasExtraEntries && hasAppendEntries) {
// conflict // not conflict by default
bool conflict = false; bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1; SyncIndex extraIndex = pMsg->prevLogIndex + 1;
...@@ -177,8 +192,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -177,8 +192,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL); assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index); assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term == pAppendEntry->term) { if (pExtraEntry->term != pAppendEntry->term) {
conflict = true; conflict = true;
} }
...@@ -187,6 +203,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -187,6 +203,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex; SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back! // notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) { for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) { if (ths->pFsm->FpRollBackCb != NULL) {
...@@ -212,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -212,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -237,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -237,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -863,7 +863,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -863,7 +863,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册