diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index feecbc6d3bd6c416c9cc436213b344f5c099abe1..c67ee41b1228cc47946ac0a7e9f574b7b13d893b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -415,7 +415,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) { walApplyVer(pVnode->pWal, pVnode->state.applied); pVnode->restored = true; - vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); + vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h index d3ba556f829b712a441d1132fa108fe9c1524fdb..7d638a7336e387cfbf69f7ec43c841310d127c16 100644 --- a/source/libs/sync/inc/syncCommit.h +++ b/source/libs/sync/inc/syncCommit.h @@ -47,8 +47,13 @@ extern "C" { // void syncOneReplicaAdvance(SSyncNode* pSyncNode); void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); + bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); +bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); + +int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex); +int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncLogBuffer.h b/source/libs/sync/inc/syncLogBuffer.h index 4c209549b49684625417cd55a3aef32f44e58212..39b4439d62df85311f7f98bb2677b4da5a708c0e 100644 --- a/source/libs/sync/inc/syncLogBuffer.h +++ b/source/libs/sync/inc/syncLogBuffer.h @@ -41,8 +41,28 @@ typedef struct SSyncLogReplMgr { int32_t peerId; } SSyncLogReplMgr; +typedef struct SSyncLogBufEntry { + SSyncRaftEntry* pItem; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; +} SSyncLogBufEntry; + +typedef struct SSyncLogBuffer { + SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE]; + int64_t startIndex; + int64_t commitIndex; + int64_t matchIndex; + int64_t endIndex; + int64_t size; + TdThreadMutex mutex; +} SSyncLogBuffer; + +// SSyncLogRepMgr + SSyncLogReplMgr* syncLogReplMgrCreate(); void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); +int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr); + int32_t syncNodeLogReplMgrInit(SSyncNode* pNode); void syncNodeLogReplMgrDestroy(SSyncNode* pNode); @@ -69,35 +89,12 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); -int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); -void syncLogDestroyAppendEntries(SRpcMsg* pRpcMsg); - -// others -bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr); - -typedef struct SSyncLogBufEntry { - SSyncRaftEntry* pItem; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; -} SSyncLogBufEntry; - -typedef struct SSyncLogBuffer { - SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE]; - int64_t startIndex; - int64_t commitIndex; - int64_t matchIndex; - int64_t endIndex; - int64_t size; - TdThreadMutex mutex; -} SSyncLogBuffer; - +// SSyncLogBuffer SSyncLogBuffer* syncLogBufferCreate(); void syncLogBufferDestroy(SSyncLogBuffer* pBuf); int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); @@ -117,8 +114,6 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); // others -bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); -void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7da610a9edf626ea6de28234ad206ab1c712ec86..f077306475908698bfb8673445b070609364cb44 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -53,6 +53,9 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpc int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); +int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 7b8e768391d82e5545a3d191785cff54dc405220..1f9675a3cd41835ea1f6ca20f5a6b21540cd317e 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -87,6 +87,8 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg); +SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); + // start #ifdef __cplusplus diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 36eb6fefc7582c3c9ae8d8e3d7cbdce2e4d7c73a..234e8bffeb26f50b816d738d5a40c20aaa77a080 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -39,24 +39,6 @@ // /\ UNCHANGED <> // -int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { - SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - commitIndex = TMAX(commitIndex, ths->commitIndex); - ths->commitIndex = TMIN(commitIndex, lastVer); - ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); - return ths->commitIndex; -} - -int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { - if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { - SyncIndex commitIndex = indexLikely; - syncNodeUpdateCommitIndex(ths, commitIndex); - sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, - ths->pRaftStore->currentTerm, commitIndex); - } - return ths->commitIndex; -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; int32_t ret = 0; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 73fc3050eff35114b5c079a7cd39fd9a85d75375..fc7ea7cc30252de30012e7a3e233ee49c76a9571 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -311,3 +311,21 @@ bool syncAgree(SSyncNode* pNode, SyncIndex index) { } return false; } + +int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { + SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + commitIndex = TMAX(commitIndex, ths->commitIndex); + ths->commitIndex = TMIN(commitIndex, lastVer); + ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); + return ths->commitIndex; +} + +int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { + if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { + SyncIndex commitIndex = indexLikely; + syncNodeUpdateCommitIndex(ths, commitIndex); + sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + ths->pRaftStore->currentTerm, commitIndex); + } + return ths->commitIndex; +} diff --git a/source/libs/sync/src/syncLogBuffer.c b/source/libs/sync/src/syncLogBuffer.c index 36f33fa46ef26e4e197f95539bc8ba2ec176dc71..621bce6683d88d12e73e571c55fb64fb4bade49e 100644 --- a/source/libs/sync/src/syncLogBuffer.c +++ b/source/libs/sync/src/syncLogBuffer.c @@ -16,10 +16,13 @@ #define _DEFAULT_SOURCE #include "syncLogBuffer.h" +#include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" #include "syncReplication.h" +#include "syncRespMgr.h" +#include "syncSnapshot.h" #include "syncUtil.h" int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { @@ -384,7 +387,7 @@ _out: return matchIndex; } -int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { +int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); SRpcMsg rpcMsg; @@ -392,7 +395,7 @@ int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncR SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = -1; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 0; cbMeta.state = role; @@ -401,6 +404,7 @@ int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncR cbMeta.currentTerm = term; cbMeta.flag = -1; + (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); return 0; } @@ -423,7 +427,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ESyncState role = pNode->state; SyncTerm term = pNode->pRaftStore->currentTerm; SyncGroupId vgId = pNode->vgId; - int32_t ret = 0; + int32_t ret = -1; int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); SSyncRaftEntry* pEntry = NULL; bool inBuf = false; @@ -459,10 +463,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm continue; } - if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) { + if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) { sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, pEntry->term); - ret = -1; goto _out; } pBuf->commitIndex = index; @@ -487,6 +490,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pBuf->startIndex = index + 1; } + ret = 0; _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { @@ -505,7 +509,7 @@ _out: return ret; } -int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { +int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { ASSERT(pMgr->startIndex >= 0); for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); @@ -601,7 +605,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod return 0; } - (void)syncLogResetLogReplMgr(pMgr); + (void)syncLogReplMgrReset(pMgr); } // send match index @@ -633,7 +637,7 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "", pNode->vgId, pMsg->startTime, pMgr->peerStartTime); - syncLogResetLogReplMgr(pMgr); + syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } taosThreadMutexUnlock(&pBuf->mutex); @@ -647,7 +651,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64 "", pNode->vgId, pMsg->startTime, pMgr->peerStartTime); - syncLogResetLogReplMgr(pMgr); + syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } @@ -861,7 +865,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // reset repl mgr for (int i = 0; i < pNode->replicaNum; i++) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - syncLogResetLogReplMgr(pMgr); + syncLogReplMgrReset(pMgr); } taosThreadMutexUnlock(&pBuf->mutex); return 0; @@ -884,14 +888,6 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) { - ASSERT(pMgr->startIndex <= pMgr->endIndex); - for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { - ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex); - } - return true; -} - int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 33c24bcca88fae9ef95dd40f5e25191cffbf32ca..457ef7eedd5990a61951a0aab89cbd1d8b29ef13 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -58,7 +58,6 @@ static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeade static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); -static SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); int64_t syncOpen(SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index 2007bc474f266a7bfd6db83dd6620c98796d538a..48f557c1a811023096be030eee7afe6a62428ac1 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "tmsgcb.h" #include "taoserror.h" +#include "transLog.h" #include "trpc.h" static SMsgCb defaultMsgCb; @@ -23,9 +24,7 @@ static SMsgCb defaultMsgCb; void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { - if (msgcb == NULL) { - return -1; - } + ASSERT(msgcb != NULL); int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont);