diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b2ed9ea3c4a051a8563e308c458835044369e790..a2f88490f05f8b579b40f8209d2dab6b7ee934ce 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -158,17 +158,17 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); // propose with sequence number, to implement linearizable semantics int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); // for compatibility, the same as syncPropose -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); extern int32_t sDebugFlag; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 5ea810180d2a16479c7a78ff7f0cf41176f24125..1116c1a9050753f1d3e8d47e4b0b343081b3f2d5 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -155,102 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - /* - bool preMatch = false; - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && - ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - preMatch = true; - } - if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= - ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, - pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { preMatch = true; - } - syncEntryDestory(pPreEntry); - } - - sTrace( - "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, logOK:%d, preMatch:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); - - if (preMatch) { - */ + // preIndex = -1, or has preIndex entry in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); - { - // preIndex = -1, or has preIndex entry in local log - assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + // has extra entries (> preIndex) in local log + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); - // has extra entries (> preIndex) in local log - bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; - - if (hasExtraEntries && hasAppendEntries) { - // not conflict by default - bool conflict = false; - - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); - assert(pExtraEntry != NULL); - - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); - // log not match, conflict - assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term != pAppendEntry->term) { - conflict = true; - } + if (hasExtraEntries && hasAppendEntries) { + // not conflict by default + bool conflict = false; - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); - sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); - assert(pRollBackEntry != NULL); + // log not match, conflict + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term != pAppendEntry->term) { + conflict = true; + } - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pRollBackEntry); - } - } + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; - // delete confict entries - ths->pLogStore->truncate(ths->pLogStore, extraIndex); + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); - // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); - } + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pRollBackEntry); } - rpcFreeCont(rpcMsg.pCont); } - // free memory - syncEntryDestory(pExtraEntry); - syncEntryDestory(pAppendEntry); - - } else if (hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else if (!hasExtraEntries && hasAppendEntries) { - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); // append new entries ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); @@ -260,55 +218,62 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } rpcFreeCont(rpcMsg.pCont); + } - // free memory - syncEntryDestory(pAppendEntry); + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); - } else if (!hasExtraEntries && !hasAppendEntries) { - // do nothing + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing - } else { - assert(0); - } + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - if (hasAppendEntries) { - pReply->matchIndex = pMsg->prevLogIndex + 1; - } else { - pReply->matchIndex = pMsg->prevLogIndex; + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); + } } + rpcFreeCont(rpcMsg.pCont); - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + // free memory + syncEntryDestory(pAppendEntry); - syncAppendEntriesReplyDestroy(pReply); + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else { + assert(0); } - /* - else { - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - } - */ + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; + + if (hasAppendEntries) { + pReply->matchIndex = pMsg->prevLogIndex + 1; + } else { + pReply->matchIndex = pMsg->prevLogIndex; + } + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 89b5f0b39c89beb57cb164622064739a2da4fb83..f581b31c4b34dbd733a207d40fda67ede825d1f5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; + sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", + newCommitIndex, pSyncNode->commitIndex); break; + } else { + sTrace( + "syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, " + "pSyncNode->pRaftStore->currentTerm:%lu", + pEntry->term, pSyncNode->pRaftStore->currentTerm); } } } diff --git a/source/libs/sync/test/syncLogStoreCheck.cpp b/source/libs/sync/test/syncLogStoreCheck.cpp index 5e5e43af0198e148474d57137712ddf9521d6f55..85e39a61c56c47f7f90d6465c5f70f5a508f5ea8 100644 --- a/source/libs/sync/test/syncLogStoreCheck.cpp +++ b/source/libs/sync/test/syncLogStoreCheck.cpp @@ -26,7 +26,7 @@ SSyncFSM* pFsm; SWal* pWal; SSyncNode* pSyncNode; -SSyncNode* syncNodeInit(const char *path) { +SSyncNode* syncNodeInit(const char* path) { syncInfo.vgId = 1234; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; @@ -78,9 +78,7 @@ SSyncNode* syncNodeInit(const char *path) { return pSyncNode; } -SSyncNode* logStoreCheck(const char *path) { return syncNodeInit(path); } - - +SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); } int main(int argc, char** argv) { // taosInitLog((char *)"syncTest.log", 100000, 10);