diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h new file mode 100644 index 0000000000000000000000000000000000000000..c76236d5bfd3ca3050c7dbe3900a5644c4388911 --- /dev/null +++ b/source/libs/sync/inc/syncCommit.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_COMMIT_H +#define _TD_LIBS_SYNC_COMMIT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" + +// \* Leader i advances its commitIndex. +// \* This is done as a separate step from handling AppendEntries responses, +// \* in part to minimize atomic regions, and in part so that leaders of +// \* single-server clusters are able to mark entries committed. +// AdvanceCommitIndex(i) == +// /\ state[i] = Leader +// /\ LET \* The set of servers that agree up through index. +// Agree(index) == {i} \cup {k \in Server : +// matchIndex[i][k] >= index} +// \* The maximum indexes for which a quorum agrees +// agreeIndexes == {index \in 1..Len(log[i]) : +// Agree(index) \in Quorum} +// \* New value for commitIndex'[i] +// newCommitIndex == +// IF /\ agreeIndexes /= {} +// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] +// THEN +// Max(agreeIndexes) +// ELSE +// commitIndex[i] +// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] +// /\ UNCHANGED <> +// +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_COMMIT_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5a9af83827b6d952ea36165d9a97524708c1c985..8e36424f192e03fc72b6878f184ce075ecbdd0bc 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 7cb186a8121800134fad6d1896870e85cbe503b1..2f8856e652a84332a9455fb1f6bc26bf8a975e89 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -25,6 +25,46 @@ extern "C" { #include #include "taosdef.h" +// TLA+ Spec +// Receive(m) == +// LET i == m.mdest +// j == m.msource +// IN \* Any RPC with a newer term causes the recipient to advance +// \* its term first. Responses with stale terms are ignored. +// \/ UpdateTerm(i, j, m) +// \/ /\ m.mtype = RequestVoteRequest +// /\ HandleRequestVoteRequest(i, j, m) +// \/ /\ m.mtype = RequestVoteResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleRequestVoteResponse(i, j, m) +// \/ /\ m.mtype = AppendEntriesRequest +// /\ HandleAppendEntriesRequest(i, j, m) +// \/ /\ m.mtype = AppendEntriesResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleAppendEntriesResponse(i, j, m) + +// DuplicateMessage(m) == +// /\ Send(m) +// /\ UNCHANGED <> + +// DropMessage(m) == +// /\ Discard(m) +// /\ UNCHANGED <> + +// Next == /\ \/ \E i \in Server : Restart(i) +// \/ \E i \in Server : Timeout(i) +// \/ \E i,j \in Server : RequestVote(i, j) +// \/ \E i \in Server : BecomeLeader(i) +// \/ \E i \in Server, v \in Value : ClientRequest(i, v) +// \/ \E i \in Server : AdvanceCommitIndex(i) +// \/ \E i,j \in Server : AppendEntries(i, j) +// \/ \E m \in DOMAIN messages : Receive(m) +// \/ \E m \in DOMAIN messages : DuplicateMessage(m) +// \/ \E m \in DOMAIN messages : DropMessage(m) +// \* History variable that tracks every log ever: +// /\ allLogs' = allLogs \cup {log[i] : i \in Server} +// + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 7013d281e3f04a8614663c97ea8990359bac8482..270180e3477b4c6359feb07a918de56ac097fcef 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -94,6 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } assert(pMsg->term <= ths->pRaftStore->currentTerm); + // reset elect timer if (pMsg->term == ths->pRaftStore->currentTerm) { ths->leaderCache = pMsg->srcId; syncNodeResetElectTimer(ths); @@ -135,38 +136,117 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { syncNodeBecomeFollower(ths); - // need ret? + // ret or reply? return ret; } // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - bool matchSuccess = false; + bool preMatch = false; if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - matchSuccess = true; + preMatch = true; } if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { - matchSuccess = true; + preMatch = true; } syncEntryDestory(pPreEntry); } - if (matchSuccess) { - // delete conflict entries - if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) { - SyncIndex fromIndex = pMsg->prevLogIndex + 1; - ths->pLogStore->truncate(ths->pLogStore, fromIndex); - } + if (preMatch) { + // must has preIndex in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + bool hasAppendEntries = pMsg->dataLen > 0; + + if (hasExtraEntries && hasAppendEntries) { + // conflict + bool conflict = false; + + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); + + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term == pAppendEntry->term) { + conflict = true; + } + + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; + + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pRollBackEntry); + } + } + + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + } + } + rpcFreeCont(rpcMsg.pCont); + } + + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); - // append one entry - if (pMsg->dataLen > 0) { - SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - syncEntryDestory(pEntry); + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + } + } + rpcFreeCont(rpcMsg.pCont); + + // free memory + syncEntryDestory(pAppendEntry); + + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else { + assert(0); } SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); @@ -175,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->term = ths->pRaftStore->currentTerm; pReply->success = true; - if (pMsg->dataLen > 0) { + if (hasAppendEntries) { pReply->matchIndex = pMsg->prevLogIndex + 1; } else { pReply->matchIndex = pMsg->prevLogIndex; @@ -201,11 +281,38 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncAppendEntriesReplyDestroy(pReply); } + // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - // commit + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; + + // update commit index ths->commitIndex = pMsg->commitIndex; + + // call back Wal ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + + // execute fsm + if (ths->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (ths->pFsm->FpCommitCb != NULL) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } + } + } } } } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 93539db9386dd061fa680a83fc1d2c955f933326..9db9a3e8ac532790dbcffff40f80b5950adc983c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,7 @@ */ #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" @@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index bc2a39aa89cb01b682567aaade00295e63911c23..c75d23d96d560c4c342631585d1903051bcd7c8c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -13,9 +13,11 @@ * along with this program. If not, see . */ +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" +#include "syncRaftStore.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -40,30 +42,80 @@ // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); // update commit index + SyncIndex newCommitIndex = pSyncNode->commitIndex; + for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; + ++index) { + if (syncAgree(pSyncNode, index)) { + // term + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); + assert(pEntry != NULL); - if (pSyncNode->pFsm != NULL) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - SyncIndex endIndex = SYNC_INDEX_INVALID; - for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - if (i != SYNC_INDEX_INVALID) { - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); - assert(pEntry != NULL); + // cannot commit, even if quorum agree. need check term! + if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { + // update commit index + newCommitIndex = index; + break; + } + } + } - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + if (newCommitIndex > pSyncNode->commitIndex) { + SyncIndex beginIndex = pSyncNode->commitIndex + 1; + SyncIndex endIndex = newCommitIndex; - if (pSyncNode->pFsm->FpCommitCb != NULL) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); - } + // update commit index + pSyncNode->commitIndex = newCommitIndex; + + // call back Wal + pSyncNode->pLogStore->updateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); + + // execute fsm + if (pSyncNode->pFsm != NULL) { + for (SyncIndex i = beginIndex; i <= endIndex; ++i) { + if (i != SYNC_INDEX_INVALID) { + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); + assert(pEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + if (pSyncNode->pFsm->FpCommitCb != NULL) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } } } } +} + +bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { + SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); + + // b for debug + bool b = false; + if (matchIndex >= index) { + b = true; + } + return b; +} + +bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { + int agreeCount = 0; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + ++agreeCount; + } + if (agreeCount >= pSyncNode->quorum) { + return true; + } + } + return false; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f3e709a27e7001e71f064b75d718dfa8f0b0bc9..6c2ef0c85b92f8d2a5ad4a9e9860976ddd5eea90 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -17,6 +17,7 @@ #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncElection.h" #include "syncEnv.h" #include "syncIndexMgr.h" @@ -150,6 +151,30 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init life cycle + // TLA+ Spec + // InitHistoryVars == /\ elections = {} + // /\ allLogs = {} + // /\ voterLog = [i \in Server |-> [j \in {} |-> <<>>]] + // InitServerVars == /\ currentTerm = [i \in Server |-> 1] + // /\ state = [i \in Server |-> Follower] + // /\ votedFor = [i \in Server |-> Nil] + // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}] + // /\ votesGranted = [i \in Server |-> {}] + // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the + // \* leader does not send itself messages. It's still easier to include these + // \* in the functions. + // InitLeaderVars == /\ nextIndex = [i \in Server |-> [j \in Server |-> 1]] + // /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]] + // InitLogVars == /\ log = [i \in Server |-> << >>] + // /\ commitIndex = [i \in Server |-> 0] + // Init == /\ messages = [m \in {} |-> 0] + // /\ InitHistoryVars + // /\ InitServerVars + // /\ InitCandidateVars + // /\ InitLeaderVars + // /\ InitLogVars + // + // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); @@ -727,6 +752,16 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } +// TLA+ Spec +// ClientRequest(i, v) == +// /\ state[i] = Leader +// /\ LET entry == [term |-> currentTerm[i], +// value |-> v] +// newLog == Append(log[i], entry) +// IN log' = [log EXCEPT ![i] = newLog] +// /\ UNCHANGED <> +// static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); @@ -740,7 +775,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ths->pLogStore->appendEntry(ths->pLogStore, pEntry); // only myself, maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); // start replicate right now! syncNodeReplicate(ths); diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 19a97ee1566d32ed0fc8786c4d75542c2c435f30..ce8bed9cd39c44df9b090ae931cba063d1dda53c 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -14,3 +14,43 @@ */ #include "syncOnMessage.h" + +// TLA+ Spec +// Receive(m) == +// LET i == m.mdest +// j == m.msource +// IN \* Any RPC with a newer term causes the recipient to advance +// \* its term first. Responses with stale terms are ignored. +// \/ UpdateTerm(i, j, m) +// \/ /\ m.mtype = RequestVoteRequest +// /\ HandleRequestVoteRequest(i, j, m) +// \/ /\ m.mtype = RequestVoteResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleRequestVoteResponse(i, j, m) +// \/ /\ m.mtype = AppendEntriesRequest +// /\ HandleAppendEntriesRequest(i, j, m) +// \/ /\ m.mtype = AppendEntriesResponse +// /\ \/ DropStaleResponse(i, j, m) +// \/ HandleAppendEntriesResponse(i, j, m) + +// DuplicateMessage(m) == +// /\ Send(m) +// /\ UNCHANGED <> + +// DropMessage(m) == +// /\ Discard(m) +// /\ UNCHANGED <> + +// Next == /\ \/ \E i \in Server : Restart(i) +// \/ \E i \in Server : Timeout(i) +// \/ \E i,j \in Server : RequestVote(i, j) +// \/ \E i \in Server : BecomeLeader(i) +// \/ \E i \in Server, v \in Value : ClientRequest(i, v) +// \/ \E i \in Server : AdvanceCommitIndex(i) +// \/ \E i,j \in Server : AppendEntries(i, j) +// \/ \E m \in DOMAIN messages : Receive(m) +// \/ \E m \in DOMAIN messages : DuplicateMessage(m) +// \/ \E m \in DOMAIN messages : DropMessage(m) +// \* History variable that tracks every log ever: +// /\ allLogs' = allLogs \cup {log[i] : i \in Server} +// \ No newline at end of file diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index d4c2dde6d06b57322198313b460b8a714833eccd..2598abbddd86ac6813afd287246202d5439cb9c8 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -162,7 +162,7 @@ int main(int argc, char **argv) { SyncClientRequest *pMsg1 = step1(pMsg0); syncClientRequestPrint2((char *)"==step1==", pMsg1); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);