diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a38431a1b2b00553ed2cacefd260d05733299669..a2f88490f05f8b579b40f8209d2dab6b7ee934ce 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -35,6 +35,7 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_CANDIDATE = 101, TAOS_SYNC_STATE_LEADER = 102, + TAOS_SYNC_STATE_ERROR = 103, } ESyncState; typedef struct SSyncBuffer { @@ -68,17 +69,20 @@ typedef struct SSnapshot { typedef struct SSyncFSM { void* data; - // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result + // when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); - // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result + // when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); // when log entry is updated by a new one, FpRollBackCb is called // user can do something to roll back. for example, delete data from tsdb, or just ignore it - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); // user should implement this function, use "data" to take snapshot into "snapshot" int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); @@ -157,10 +161,14 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); + +// propose with sequence number, to implement linearizable semantics +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); + +// for compatibility, the same as syncPropose +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); extern int32_t sDebugFlag; diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index d979e0df150f48ec403012d13eafb5431c5b42e6..2196d6c207430b166d6596344b9c5837f2b03d28 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); +char* logStoreSimple2Str(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); @@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore); void logStoreLog(SSyncLogStore* pLogStore); void logStoreLog2(char* s, SSyncLogStore* pLogStore); +void logStoreSimplePrint(SSyncLogStore* pLogStore); +void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore); +void logStoreSimpleLog(SSyncLogStore* pLogStore); +void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e4df93ca47c013348289f057a8a10063e6e6edfe..1116c1a9050753f1d3e8d47e4b0b343081b3f2d5 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { + sTrace( + "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; @@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + sTrace( + "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + syncNodeBecomeFollower(ths); // ret or reply? @@ -145,89 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - bool preMatch = false; - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && - ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - preMatch = true; - } - if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); - assert(pPreEntry != NULL); - if (pMsg->prevLogTerm == pPreEntry->term) { - preMatch = true; - } - syncEntryDestory(pPreEntry); - } + // preIndex = -1, or has preIndex entry in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); - if (preMatch) { - // must has preIndex in local log - assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + // has extra entries (> preIndex) in local log + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); - bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); - bool hasAppendEntries = pMsg->dataLen > 0; + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; - if (hasExtraEntries && hasAppendEntries) { - // conflict - bool conflict = false; + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); - assert(pExtraEntry != NULL); + if (hasExtraEntries && hasAppendEntries) { + // not conflict by default + bool conflict = false; - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); - assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term == pAppendEntry->term) { - conflict = true; - } + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; + // log not match, conflict + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term != pAppendEntry->term) { + conflict = true; + } - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); - assert(pRollBackEntry != NULL); + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0); - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pRollBackEntry); - } - } + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); - // delete confict entries - ths->pLogStore->truncate(ths->pLogStore, extraIndex); + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - - // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); - } + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pRollBackEntry); } - rpcFreeCont(rpcMsg.pCont); } - // free memory - syncEntryDestory(pExtraEntry); - syncEntryDestory(pAppendEntry); - - } else if (hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else if (!hasExtraEntries && hasAppendEntries) { - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); // append new entries ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); @@ -237,53 +218,63 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } rpcFreeCont(rpcMsg.pCont); + } - // free memory - syncEntryDestory(pAppendEntry); + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); - } else if (!hasExtraEntries && !hasAppendEntries) { - // do nothing + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing - } else { - assert(0); - } + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - if (hasAppendEntries) { - pReply->matchIndex = pMsg->prevLogIndex + 1; - } else { - pReply->matchIndex = pMsg->prevLogIndex; + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); + } } + rpcFreeCont(rpcMsg.pCont); - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + // free memory + syncEntryDestory(pAppendEntry); - syncAppendEntriesReplyDestroy(pReply); + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing } else { - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; + assert(0); + } - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; + + if (hasAppendEntries) { + pReply->matchIndex = pMsg->prevLogIndex + 1; + } else { + pReply->matchIndex = pMsg->prevLogIndex; } + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { // has commit entry in local @@ -308,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm->FpCommitCb != NULL) { - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 817974fd268fa4105c45d51c5b591a79167971dc..790ac7f8e1c997c3f8721b5badfc85a9b6fa2a94 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); + // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); @@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); + return ret; } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0d4df8e6cf43aecce88b71145f1f0f4c1fe49744..f581b31c4b34dbd733a207d40fda67ede825d1f5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; + sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", + newCommitIndex, pSyncNode->commitIndex); break; + } else { + sTrace( + "syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, " + "pSyncNode->pRaftStore->currentTerm:%lu", + pEntry->term, pSyncNode->pRaftStore->currentTerm); } } } @@ -91,7 +98,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (pSyncNode->pFsm->FpCommitCb != NULL) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state); } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 19121632e20f55e1402898f352533cf9565c0249..b05d2e397dfb6b22607e26dd5dec7833aa960c40 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -257,13 +257,6 @@ static void *syncIOConsumerFunc(void *param) { assert(pSyncMsg != NULL); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - /* - pSyncMsg = syncPingBuild(pRpcMsg->contLen); - syncPingFromRpcMsg(pRpcMsg, pSyncMsg); - // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); - io->FpOnSyncPing(io->pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - */ } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b92317ef3f75621a665b2fbb7ec8618f75e30a2b..f2341ef521f8516c4c56186a1fa9a70f136929b2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -104,9 +104,26 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; + int32_t ret = syncPropose2(rid, pMsg, isWeak, 0); + return ret; +} + +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + int32_t ret = syncPropose(rid, pMsg, isWeak); + return ret; +} + +ESyncState syncGetMyRole(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_STATE_ERROR; + } + assert(rid == pSyncNode->rid); + return pSyncNode->state; +} - // todo : get pointer from rid +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) { + int32_t ret = 0; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return -1; @@ -114,7 +131,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { assert(rid == pSyncNode->rid); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak); SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); @@ -122,7 +139,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ret = 0; } else { - sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); + sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); ret = -1; // todo : need define err code !! } @@ -130,19 +147,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = syncPropose(rid, pMsg, isWeak); - return ret; -} - -ESyncState syncGetMyRole(int64_t rid) { - // todo : get pointer from rid - SSyncNode* pSyncNode = NULL; - return pSyncNode->state; -} - -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} - // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); @@ -848,7 +852,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } } rpcFreeCont(rpcMsg.pCont); @@ -863,7 +867,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state); } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3ac4a7a1c083485fe776a92a59c2989b111966a7..f2344c6b6d413a2d8f3ec068a5c7aa974a6d212c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -834,7 +834,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastLogIndex); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); @@ -1127,14 +1127,14 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); - cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); - cJSON_AddStringToObject(pRoot, "commit_index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); char* s; @@ -1288,7 +1288,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 620e923629863a844d3b1382d43e341426e10cc7..f569c5d6216a3606ce3a9218246b35bb7da0739b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) { return serialized; } +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { + char u64buf[128]; + SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; + cJSON* pRoot = cJSON_CreateObject(); + + if (pData != NULL && pData->pWal != NULL) { + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot); + return pJson; +} + +char* logStoreSimple2Str(SSyncLogStore* pLogStore) { + cJSON* pJson = logStoreSimple2Json(pLogStore); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); @@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) { sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); free(serialized); } + +// for debug ----------------- +void logStoreSimplePrint(SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void logStoreSimpleLog(SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0bb701fc09a112234622a3f32830ba12e47445b7..943b268cd330c285bc11a55a9a751dd7c83a9bdd 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,6 +49,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex); + syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex); + logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore); + int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); @@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; + syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg); + // send AppendEntries syncNodeAppendEntries(pSyncNode, pDestId, pMsg); syncAppendEntriesDestroy(pMsg); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index f02ba29218d3e069df4575022d22ee0745263b2d..dcf380e7e4248dfd2d8e6f613d2482ec818d3b41 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -29,10 +29,14 @@ add_executable(syncPingTimerTest2 "") add_executable(syncPingSelfTest "") add_executable(syncElectTest "") add_executable(syncElectTest2 "") +add_executable(syncElectTest3 "") add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") +add_executable(syncReplicateTest2 "") +add_executable(syncReplicateLoadTest "") add_executable(syncRefTest "") +add_executable(syncLogStoreCheck "") target_sources(syncTest @@ -159,6 +163,10 @@ target_sources(syncElectTest2 PRIVATE "syncElectTest2.cpp" ) +target_sources(syncElectTest3 + PRIVATE + "syncElectTest3.cpp" +) target_sources(syncEncodeTest PRIVATE "syncEncodeTest.cpp" @@ -171,10 +179,22 @@ target_sources(syncReplicateTest PRIVATE "syncReplicateTest.cpp" ) +target_sources(syncReplicateTest2 + PRIVATE + "syncReplicateTest2.cpp" +) +target_sources(syncReplicateLoadTest + PRIVATE + "syncReplicateLoadTest.cpp" +) target_sources(syncRefTest PRIVATE "syncRefTest.cpp" ) +target_sources(syncLogStoreCheck + PRIVATE + "syncLogStoreCheck.cpp" +) target_include_directories(syncTest @@ -332,6 +352,11 @@ target_include_directories(syncElectTest2 "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncElectTest3 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncEncodeTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" @@ -347,11 +372,26 @@ target_include_directories(syncReplicateTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncReplicateTest2 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncReplicateLoadTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncRefTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLogStoreCheck + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -478,6 +518,10 @@ target_link_libraries(syncElectTest2 sync gtest_main ) +target_link_libraries(syncElectTest3 + sync + gtest_main +) target_link_libraries(syncEncodeTest sync gtest_main @@ -490,10 +534,22 @@ target_link_libraries(syncReplicateTest sync gtest_main ) +target_link_libraries(syncReplicateTest2 + sync + gtest_main +) +target_link_libraries(syncReplicateLoadTest + sync + gtest_main +) target_link_libraries(syncRefTest sync gtest_main ) +target_link_libraries(syncLogStoreCheck + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncElectTest3.cpp b/source/libs/sync/test/syncElectTest3.cpp new file mode 100644 index 0000000000000000000000000000000000000000..45c5488c601cd0e19d074c8581f3f1af41bbd281 --- /dev/null +++ b/source/libs/sync/test/syncElectTest3.cpp @@ -0,0 +1,138 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "tref.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect3_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./elect3_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + pSyncNode->hbBaseLine = 500; + pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char*)"", pSyncNode); + initRaftId(pSyncNode); + + //--------------------------- + while (1) { + sTrace( + "elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + syncNodeRelease(pSyncNode); + + return 0; +} diff --git a/source/libs/sync/test/syncLogStoreCheck.cpp b/source/libs/sync/test/syncLogStoreCheck.cpp new file mode 100644 index 0000000000000000000000000000000000000000..85e39a61c56c47f7f90d6465c5f70f5a508f5ea8 --- /dev/null +++ b/source/libs/sync/test/syncLogStoreCheck.cpp @@ -0,0 +1,105 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; +SSyncNode* pSyncNode; + +SSyncNode* syncNodeInit(const char* path) { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./log_check"); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(path, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); } + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + pSyncNode = logStoreCheck(argv[1]); + assert(pSyncNode != NULL); + + logStorePrint2((char*)"logStoreCheck", pSyncNode->pLogStore); + + return 0; +} diff --git a/source/libs/sync/test/syncReplicateLoadTest.cpp b/source/libs/sync/test/syncReplicateLoadTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d53ceca473fa5cffed99379cf600a7a51cad7bc4 --- /dev/null +++ b/source/libs/sync/test/syncReplicateLoadTest.cpp @@ -0,0 +1,188 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM *pFsm; +SWal * pWal; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + // pSyncNode->hbBaseLine = 500; + // pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0(int i) { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 128; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + initFsm(); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char *)"", pSyncNode); + initRaftId(pSyncNode); + + // only load ... + + while (1) { + sTrace( + "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 4d6e6f3a25f92f5e5cc33b65b249175cfb212f65..47399eeb3c741ddc139e592d74f35483871be14a 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -28,19 +28,29 @@ SSyncFSM * pFsm; SWal * pWal; SSyncNode *gSyncNode; -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } void initFsm() { diff --git a/source/libs/sync/test/syncReplicateTest2.cpp b/source/libs/sync/test/syncReplicateTest2.cpp new file mode 100644 index 0000000000000000000000000000000000000000..09dbc0e2ed5d746b6b8a2ef95bb12910c6959840 --- /dev/null +++ b/source/libs/sync/test/syncReplicateTest2.cpp @@ -0,0 +1,203 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM *pFsm; +SWal * pWal; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + // pSyncNode->hbBaseLine = 500; + // pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0(int i) { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 128; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + initFsm(); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char *)"", pSyncNode); + initRaftId(pSyncNode); + + for (int i = 0; i < 30; ++i) { + // step0 + SRpcMsg *pMsg0 = step0(i); + syncRpcMsgPrint2((char *)"==step0==", pMsg0); + + syncPropose(rid, pMsg0, true); + taosMsleep(1000); + + free(pMsg0); + + sTrace( + "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + } + + while (1) { + sTrace( + "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 2598abbddd86ac6813afd287246202d5439cb9c8..732bcf140ba4a81d1d8633d6f31d96db137d54a9 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -28,19 +28,29 @@ SSyncFSM * pFsm; SWal * pWal; SSyncNode *gSyncNode; -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } void initFsm() {