syncMain.c 8.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
M
Minghao Li 已提交
28
static void    syncNodePingTimerCb(void* param, void* tmrId);
M
Minghao Li 已提交
29 30 31 32 33 34 35 36 37 38 39

static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
40 41 42 43 44 45
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
48

M
Minghao Li 已提交
49 50 51 52 53
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
54 55 56 57 58

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
59 60 61
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }

int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
62 63 64

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
65 66 67 68 69
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
70
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
71

M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

  pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER;
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
94 95 96 97 98
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->FpPingTimer = syncNodePingTimerCb;
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
99

M
Minghao Li 已提交
100 101 102 103 104 105
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
115 116
void syncNodePingAll(SSyncNode* pSyncNode) {
  sTrace("syncNodePingAll %p ", pSyncNode);
M
Minghao Li 已提交
117 118
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
119 120 121 122 123
    SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
    memcpy(pMsg->data, "ping", strlen("ping") + 1);
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &pMsg->destId);
    pMsg->srcId = pSyncNode->raftId;
    ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
124 125
    assert(ret == 0);
  }
M
Minghao Li 已提交
126
}
M
Minghao Li 已提交
127

M
Minghao Li 已提交
128 129 130 131 132 133 134 135 136 137
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
    SyncPing* pSyncPing;
    SRaftId   raftId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &raftId);
    ret = syncNodePing(pSyncNode, &raftId, pSyncPing);
    assert(ret == 0);
  }
}
M
Minghao Li 已提交
138

M
Minghao Li 已提交
139 140
void syncNodePingSelf(SSyncNode* pSyncNode) {
  int32_t   ret = 0;
M
Minghao Li 已提交
141 142 143 144 145
  SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
  memcpy(pMsg->data, "ping", strlen("ping") + 1);
  pMsg->destId = pSyncNode->raftId;
  pMsg->srcId = pSyncNode->raftId;
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
146 147
  assert(ret == 0);
}
M
Minghao Li 已提交
148

M
Minghao Li 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  }

  atomic_store_8(&pSyncNode->pingTimerStart, 1);
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->pingTimerCounter = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
168
// ------ local funciton ---------
M
Minghao Li 已提交
169
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
170 171 172 173
  int32_t ret = 0;
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
174 175 176
  return ret;
}

M
Minghao Li 已提交
177
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
178
  int32_t ret = 0;
M
Minghao Li 已提交
179 180 181
  return ret;
}

M
Minghao Li 已提交
182
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
183
  int32_t ret = 0;
M
Minghao Li 已提交
184 185 186
  return ret;
}

M
Minghao Li 已提交
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
202
  int32_t ret = 0;
M
Minghao Li 已提交
203 204 205
  return ret;
}

M
Minghao Li 已提交
206
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
207
  int32_t ret = 0;
M
Minghao Li 已提交
208 209 210
  return ret;
}

M
Minghao Li 已提交
211
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
212
  int32_t ret = 0;
M
Minghao Li 已提交
213 214 215
  return ret;
}

M
Minghao Li 已提交
216
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
217
  int32_t ret = 0;
M
Minghao Li 已提交
218 219 220
  return ret;
}

M
Minghao Li 已提交
221
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
222
  int32_t ret = 0;
M
Minghao Li 已提交
223 224 225
  return ret;
}

M
Minghao Li 已提交
226
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
227
  int32_t ret = 0;
M
Minghao Li 已提交
228
  return ret;
M
Minghao Li 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242
}

static void syncNodePingTimerCb(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_8(&pSyncNode->pingTimerStart)) {
    ++(pSyncNode->pingTimerCounter);
    // pSyncNode->pingTimerMS += 100;

    sTrace("pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, tmrId:%p ",
           pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);

    taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);

M
Minghao Li 已提交
243
    syncNodePingAll(pSyncNode);
M
Minghao Li 已提交
244
  }
M
Minghao Li 已提交
245
}