/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" #include "syncCommit.h" #include "syncElection.h" #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncRequestVote.h" #include "syncRequestVoteReply.h" #include "syncRespMgr.h" #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" #include "tref.h" static int32_t tsNodeRefId = -1; // ------ local funciton --------- // enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeEqNoop(SSyncNode* ths); static int32_t syncNodeAppendNoop(SSyncNode* ths); // process message ---- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); // life cycle static void syncFreeNode(void* param); // --------------------------------- int32_t syncInit() { int32_t ret; tsNodeRefId = taosOpenRef(200, syncFreeNode); if (tsNodeRefId < 0) { sError("failed to init node ref"); syncCleanUp(); ret = -1; } else { ret = syncEnvStart(); } return ret; } void syncCleanUp() { int32_t ret = syncEnvStop(); assert(ret == 0); if (tsNodeRefId != -1) { taosCloseRef(tsNodeRefId); tsNodeRefId = -1; } } int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); assert(pSyncNode != NULL); syncNodeLog2("syncNodeOpen open success", pSyncNode); pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); if (pSyncNode->rid < 0) { syncFreeNode(pSyncNode); return -1; } return pSyncNode->rid; } void syncStart(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } syncNodeStart(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } syncNodeClose(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosRemoveRef(tsNodeRefId, rid); } int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; return ret; } int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = syncPropose(rid, pMsg, isWeak); return ret; } ESyncState syncGetMyRole(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); ESyncState state = pSyncNode->state; taosReleaseRef(tsNodeRefId, pSyncNode->rid); return state; } const char* syncGetMyRoleStr(int64_t rid) { const char* s = syncUtilState2String(syncGetMyRole(rid)); return s; } SyncTerm syncGetMyTerm(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); SyncTerm term = pSyncNode->pRaftStore->currentTerm; taosReleaseRef(tsNodeRefId, pSyncNode->rid); return term; } int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); SRespStub stub; int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); } taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); } taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } void syncSetQ(int64_t rid, void* queue) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); return; } assert(rid == pSyncNode->rid); pSyncNode->queue = queue; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void syncSetRpc(int64_t rid, void* rpcHandle) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid); return; } assert(rid == pSyncNode->rid); pSyncNode->rpcClient = rpcHandle; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } char* sync2SimpleStr(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid); return NULL; } assert(rid == pSyncNode->rid); char* s = syncNode2SimpleStr(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return s; } void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } assert(rid == pSyncNode->rid); pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void setElectTimerMS(int64_t rid, int32_t electTimerMS) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } assert(rid == pSyncNode->rid); pSyncNode->electBaseLine = electTimerMS; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return; } assert(rid == pSyncNode->rid); pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_PROPOSE_OTHER_ERROR; } assert(rid == pSyncNode->rid); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; stub.createTime = taosGetTimestampMs(); stub.rpcMsg = *pMsg; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); } else { sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); ret = TAOS_SYNC_PROPOSE_SUCCESS; } else { sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); ret = TAOS_SYNC_PROPOSE_NOT_LEADER; } taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); int32_t ret = 0; if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); return NULL; } // create raft config file snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath); assert(ret == 0); } // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->queue = pSyncInfo->queue; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); assert(pSyncNode->pRaftCfg != NULL); // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; int j = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { if (i != pSyncNode->pRaftCfg->cfg.myIndex) { pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; j++; } } for (int i = 0; i < pSyncNode->peersNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } // init raft algorithm pSyncNode->pFsm = pSyncInfo->pFsm; pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; // init life cycle outside // TLA+ Spec // InitHistoryVars == /\ elections = {} // /\ allLogs = {} // /\ voterLog = [i \in Server |-> [j \in {} |-> <<>>]] // InitServerVars == /\ currentTerm = [i \in Server |-> 1] // /\ state = [i \in Server |-> Follower] // /\ votedFor = [i \in Server |-> Nil] // InitCandidateVars == /\ votesResponded = [i \in Server |-> {}] // /\ votesGranted = [i \in Server |-> {}] // \* The values nextIndex[i][i] and matchIndex[i][i] are never read, since the // \* leader does not send itself messages. It's still easier to include these // \* in the functions. // InitLeaderVars == /\ nextIndex = [i \in Server |-> [j \in Server |-> 1]] // /\ matchIndex = [i \in Server |-> [j \in Server |-> 0]] // InitLogVars == /\ log = [i \in Server |-> << >>] // /\ commitIndex = [i \in Server |-> 0] // Init == /\ messages = [m \in {} |-> 0] // /\ InitHistoryVars // /\ InitServerVars // /\ InitCandidateVars // /\ InitLeaderVars // /\ InitLogVars // // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); assert(pSyncNode->pRaftStore != NULL); // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); assert(pSyncNode->pVotesGranted != NULL); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); assert(pSyncNode->pVotesRespond != NULL); // init TLA+ leader vars pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); assert(pSyncNode->pNextIndex != NULL); pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); assert(pSyncNode->pMatchIndex != NULL); // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); assert(pSyncNode->pLogStore != NULL); pSyncNode->commitIndex = SYNC_INDEX_INVALID; // timer ms init pSyncNode->pingBaseLine = PING_TIMER_MS; pSyncNode->electBaseLine = ELECT_TIMER_MS_MIN; pSyncNode->hbBaseLine = HEARTBEAT_TIMER_MS; // init ping timer pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = pSyncNode->pingBaseLine; atomic_store_64(&pSyncNode->pingTimerLogicClock, 0); atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0); pSyncNode->FpPingTimerCB = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; // init elect timer pSyncNode->pElectTimer = NULL; pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); atomic_store_64(&pSyncNode->electTimerLogicClock, 0); atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0); pSyncNode->FpElectTimerCB = syncNodeEqElectTimer; pSyncNode->electTimerCounter = 0; // init heartbeat timer pSyncNode->pHeartbeatTimer = NULL; pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; pSyncNode->heartbeatTimerCounter = 0; // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); assert(pSyncNode->pSyncRespMgr != NULL); // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); return pSyncNode; } void syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { syncNodeBecomeLeader(pSyncNode); syncNodeLog2("==state change become leader immediately==", pSyncNode); // Raft 3.6.2 Committing entries from previous terms // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica return; } syncNodeBecomeFollower(pSyncNode); // for test int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); } void syncNodeClose(SSyncNode* pSyncNode) { int32_t ret; assert(pSyncNode != NULL); ret = raftStoreClose(pSyncNode->pRaftStore); assert(ret == 0); voteGrantedDestroy(pSyncNode->pVotesGranted); votesRespondDestory(pSyncNode->pVotesRespond); syncIndexMgrDestroy(pSyncNode->pNextIndex); syncIndexMgrDestroy(pSyncNode->pMatchIndex); logStoreDestory(pSyncNode->pLogStore); raftCfgClose(pSyncNode->pRaftCfg); syncNodeStopPingTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode); if (pSyncNode->pFsm != NULL) { taosMemoryFree(pSyncNode->pFsm); } // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } // ping -------------- int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { syncPingLog2((char*)"==syncNodePing==", pMsg); int32_t ret = 0; SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodePing==", &rpcMsg); ret = syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); return ret; } int32_t syncNodePingSelf(SSyncNode* pSyncNode) { int32_t ret = 0; SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); assert(ret == 0); syncPingDestroy(pMsg); return ret; } int32_t syncNodePingPeers(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* destId = &(pSyncNode->peersId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); assert(ret == 0); syncPingDestroy(pMsg); } return ret; } int32_t syncNodePingAll(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { SRaftId* destId = &(pSyncNode->replicasId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); assert(ret == 0); syncPingDestroy(pMsg); } return ret; } // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); return ret; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); taosTmrStop(pSyncNode->pPingTimer); pSyncNode->pPingTimer = NULL; return ret; } int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; pSyncNode->electTimerMS = ms; taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); return ret; } int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1); taosTmrStop(pSyncNode->pElectTimer); pSyncNode->pElectTimer = NULL; return ret; } int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; syncNodeStopElectTimer(pSyncNode); syncNodeStartElectTimer(pSyncNode, ms); return ret; } int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); ret = syncNodeRestartElectTimer(pSyncNode, electMS); return ret; } int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); return ret; } int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); taosTmrStop(pSyncNode->pHeartbeatTimer); pSyncNode->pHeartbeatTimer = NULL; return ret; } // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); if (pSyncNode->FpSendMsg != NULL) { pMsg->noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); } else { sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); } return 0; } int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); if (pSyncNode->FpSendMsg != NULL) { pMsg->noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); } else { sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); } return 0; } cJSON* syncNode2Json(const SSyncNode* pSyncNode) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); if (pSyncNode != NULL) { // init by SSyncInfo cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); cJSON_AddItemToObject(pRoot, "SRaftCfg", raftCfg2Json(pSyncNode->pRaftCfg)); cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); cJSON_AddStringToObject(pRoot, "raftStorePath", pSyncNode->raftStorePath); cJSON_AddStringToObject(pRoot, "configPath", pSyncNode->configPath); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); cJSON_AddStringToObject(pRoot, "queue", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); // init internal cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe); cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId); cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId); cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum); cJSON* pPeers = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers); for (int i = 0; i < pSyncNode->peersNum; ++i) { cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i])); } cJSON* pPeersId = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "peersId", pPeersId); for (int i = 0; i < pSyncNode->peersNum; ++i) { cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i])); } cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum); cJSON* pReplicasId = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId); for (int i = 0; i < pSyncNode->replicaNum; ++i) { cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i])); } // raft algorithm snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm); cJSON_AddStringToObject(pRoot, "pFsm", u64buf); cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum); cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); // life cycle snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid); cJSON_AddStringToObject(pRoot, "rid", u64buf); // tla+ server vars cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore)); // tla+ candidate vars cJSON_AddItemToObject(pRoot, "pVotesGranted", voteGranted2Json(pSyncNode->pVotesGranted)); cJSON_AddItemToObject(pRoot, "pVotesRespond", votesRespond2Json(pSyncNode->pVotesRespond)); // tla+ leader vars cJSON_AddItemToObject(pRoot, "pNextIndex", syncIndexMgr2Json(pSyncNode->pNextIndex)); cJSON_AddItemToObject(pRoot, "pMatchIndex", syncIndexMgr2Json(pSyncNode->pMatchIndex)); // tla+ log vars cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore)); snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex); cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); // timer ms init cJSON_AddNumberToObject(pRoot, "pingBaseLine", pSyncNode->pingBaseLine); cJSON_AddNumberToObject(pRoot, "electBaseLine", pSyncNode->electBaseLine); cJSON_AddNumberToObject(pRoot, "hbBaseLine", pSyncNode->hbBaseLine); // ping timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf); cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock); cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter); cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); // elect timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer); cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf); cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock); cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter); cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); // heartbeat timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer); cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf); cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter); cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); // callback snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing); cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply); cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote); cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply); cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries); cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply); cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout); cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf); } cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncNode", pRoot); return pJson; } char* syncNode2Str(const SSyncNode* pSyncNode) { cJSON* pJson = syncNode2Json(pSyncNode); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); return s; } SSyncNode* syncNodeAcquire(int64_t rid) { SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); if (pNode == NULL) { sTrace("failed to acquire node from refId:%" PRId64, rid); } return pNode; } void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); } // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); syncNodeBecomeFollower(pSyncNode); raftStoreClearVote(pSyncNode->pRaftStore); } } void syncNodeBecomeFollower(SSyncNode* pSyncNode) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; } // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); // reset elect timer syncNodeResetElectTimer(pSyncNode); } // TLA+ Spec // \* Candidate i transitions to leader. // BecomeLeader(i) == // /\ state[i] = Candidate // /\ votesGranted[i] \in Quorum // /\ state' = [state EXCEPT ![i] = Leader] // /\ nextIndex' = [nextIndex EXCEPT ![i] = // [j \in Server |-> Len(log[i]) + 1]] // /\ matchIndex' = [matchIndex EXCEPT ![i] = // [j \in Server |-> 0]] // /\ elections' = elections \cup // {[eterm |-> currentTerm[i], // eleader |-> i, // elog |-> log[i], // evotes |-> votesGranted[i], // evoterLog |-> voterLog[i]]} // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode) { // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; // set leader cache pSyncNode->leaderCache = pSyncNode->myRaftId; for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) { // maybe overwrite myself, no harm // just do it! pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; } for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) { // maybe overwrite myself, no harm // just do it! pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; } // stop elect timer syncNodeStopElectTimer(pSyncNode); // start replicate right now! syncNodeReplicate(pSyncNode); // start heartbeat timer syncNodeStartHeartbeatTimer(pSyncNode); } void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); // Raft 3.6.2 Committing entries from previous terms // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica // do not use this // syncNodeEqNoop(pSyncNode); } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode); } void syncNodeLeader2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncNodeBecomeFollower(pSyncNode); syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode); syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); } // raft vote -------------- // just called by syncNodeVoteForSelf // need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { assert(term == pSyncNode->pRaftStore->currentTerm); assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); raftStoreVote(pSyncNode->pRaftStore, pRaftId); } // simulate get vote from outside void syncNodeVoteForSelf(SSyncNode* pSyncNode) { syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId; pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->voteGranted = true; voteGrantedVote(pSyncNode->pVotesGranted, pMsg); votesRespondAdd(pSyncNode->pVotesRespond, pMsg); syncRequestVoteReplyDestroy(pMsg); } // for debug -------------- void syncNodePrint(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized); fflush(NULL); taosMemoryFree(serialized); } void syncNodePrint2(char* s, SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); taosMemoryFree(serialized); } void syncNodeLog(SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); sTraceLong("syncNodeLog | len:%lu | %s", strlen(serialized), serialized); taosMemoryFree(serialized); } void syncNodeLog2(char* s, SSyncNode* pObj) { char* serialized = syncNode2Str(pObj); sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } // ------ local funciton --------- // enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode->pingTimerMS, pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); } else { sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "", pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } } static void syncNodeEqElectTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock), pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); } else { sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); // reset timer ms pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); } else { sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "", pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); } } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), pSyncNode->heartbeatTimerMS, pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); } else { sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } } static int32_t syncNodeEqNoop(SSyncNode* ths) { int32_t ret = 0; assert(ths->state == TAOS_SYNC_STATE_LEADER); SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); assert(pEntry != NULL); uint32_t entryLen; char* serialized = syncEntrySerialize(pEntry, &entryLen); SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); assert(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (ths->FpEqMsg != NULL) { ths->FpEqMsg(ths->queue, &rpcMsg); } else { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } taosMemoryFree(serialized); syncClientRequestDestroy(pSyncMsg); return ret; } static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t ret = 0; SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); assert(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { ths->pLogStore->appendEntry(ths->pLogStore, pEntry); syncNodeReplicate(ths); } return ret; } // on message ---- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { // log state char logBuf[1024]; snprintf(logBuf, sizeof(logBuf), "==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%lu electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, electTimerMS:%d", ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS); int32_t ret = 0; syncPingLog2(logBuf, pMsg); SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* // htonl SMsgHead* pHead = rpcMsg.pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); */ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); return ret; } int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { int32_t ret = 0; syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); return ret; } // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader // /\ LET entry == [term |-> currentTerm[i], // value |-> v] // newLog == Append(log[i], entry) // IN log' = [log EXCEPT ![i] = newLog] // /\ UNCHANGED <> // int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); assert(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { ths->pLogStore->appendEntry(ths->pLogStore, pEntry); // start replicate right now! syncNodeReplicate(ths); // pre commit SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 0; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); } } rpcFreeCont(rpcMsg.pCont); // only myself, maybe commit syncMaybeAdvanceCommitIndex(ths); } else { // pre commit SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 1; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); } } rpcFreeCont(rpcMsg.pCont); } syncEntryDestory(pEntry); return ret; } static void syncFreeNode(void* param) { SSyncNode* pNode = param; // inner object already free // syncNodePrint2((char*)"==syncFreeNode==", pNode); taosMemoryFree(pNode); } const char* syncStr(ESyncState state) { switch (state) { case TAOS_SYNC_STATE_FOLLOWER: return "FOLLOWER"; case TAOS_SYNC_STATE_CANDIDATE: return "CANDIDATE"; case TAOS_SYNC_STATE_LEADER: return "LEADER"; default: return "ERROR"; } }