syncReplication.c 4.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncReplication.h"
M
Minghao Li 已提交
18
#include "syncIndexMgr.h"
19
#include "syncPipeline.h"
M
Minghao Li 已提交
20
#include "syncRaftEntry.h"
M
Minghao Li 已提交
21
#include "syncRaftStore.h"
M
Minghao Li 已提交
22
#include "syncUtil.h"
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
// TLA+ Spec
// AppendEntries(i, j) ==
//    /\ i /= j
//    /\ state[i] = Leader
//    /\ LET prevLogIndex == nextIndex[i][j] - 1
//           prevLogTerm == IF prevLogIndex > 0 THEN
//                              log[i][prevLogIndex].term
//                          ELSE
//                              0
//           \* Send up to 1 entry, constrained by the end of the log.
//           lastEntry == Min({Len(log[i]), nextIndex[i][j]})
//           entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
//       IN Send([mtype          |-> AppendEntriesRequest,
//                mterm          |-> currentTerm[i],
//                mprevLogIndex  |-> prevLogIndex,
//                mprevLogTerm   |-> prevLogTerm,
//                mentries       |-> entries,
//                \* mlog is used as a history variable for the proof.
//                \* It would not exist in a real implementation.
//                mlog           |-> log[i],
//                mcommitIndex   |-> Min({commitIndex[i], lastEntry}),
//                msource        |-> i,
//                mdest          |-> j])
//    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

49 50
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);

51 52 53 54
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
55
  syncLogReplReset(pMgr);
56 57 58 59
  taosThreadMutexUnlock(&pBuf->mutex);
  return 0;
}

B
Benguang Zhao 已提交
60
int32_t syncNodeReplicate(SSyncNode* pNode) {
61 62 63 64 65 66 67 68
  SSyncLogBuffer* pBuf = pNode->pLogBuf;
  taosThreadMutexLock(&pBuf->mutex);
  int32_t ret = syncNodeReplicateWithoutLock(pNode);
  taosThreadMutexUnlock(&pBuf->mutex);
  return ret;
}

int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
B
Benguang Zhao 已提交
69 70 71 72 73 74 75 76
  if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum == 1) {
    return -1;
  }
  for (int32_t i = 0; i < pNode->replicaNum; i++) {
    if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
      continue;
    }
    SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
77
    (void)syncLogReplReplicateOnce(pMgr, pNode);
B
Benguang Zhao 已提交
78 79 80 81
  }
  return 0;
}

82
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
83
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
B
Benguang Zhao 已提交
84
  pMsg->destId = *destRaftId;
85
  syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
B
Benguang Zhao 已提交
86 87 88
  return 0;
}

89 90 91
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
  int32_t            ret = 0;
  SyncAppendEntries* pMsg = pRpcMsg->pCont;
92

93 94
  if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
    ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
95
  } else {
96
    sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
97
    rpcFreeCont(pRpcMsg->pCont);
M
Minghao Li 已提交
98
  }
99

M
Minghao Li 已提交
100 101 102
  return ret;
}

S
Shengliang Guan 已提交
103 104
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
  return syncNodeSendMsgById(destId, pSyncNode, pMsg);
M
Minghao Li 已提交
105 106 107
}

int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
108
  int64_t ts = taosGetTimestampMs();
M
Minghao Li 已提交
109
  for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
S
Shengliang Guan 已提交
110 111
    SRpcMsg rpcMsg = {0};
    if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) {
112
      sError("vgId:%d, build sync-heartbeat error", pSyncNode->vgId);
S
Shengliang Guan 已提交
113 114 115 116
      continue;
    }

    SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
M
Minghao Li 已提交
117 118
    pSyncMsg->srcId = pSyncNode->myRaftId;
    pSyncMsg->destId = pSyncNode->peersId[i];
119
    pSyncMsg->term = raftStoreGetTerm(pSyncNode);
M
Minghao Li 已提交
120
    pSyncMsg->commitIndex = pSyncNode->commitIndex;
M
Minghao Li 已提交
121
    pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
M
Minghao Li 已提交
122
    pSyncMsg->privateTerm = 0;
123
    pSyncMsg->timeStamp = ts;
M
Minghao Li 已提交
124 125

    // send msg
S
Shengliang Guan 已提交
126
    syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0);
S
Shengliang Guan 已提交
127
    syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
M
Minghao Li 已提交
128 129 130
  }

  return 0;
B
Benguang Zhao 已提交
131
}