syncMain.c 11.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
M
Minghao Li 已提交
28
static void    syncNodePingTimerCb(void* param, void* tmrId);
M
Minghao Li 已提交
29 30 31 32 33 34 35 36 37 38 39

static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
40 41 42 43 44 45
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
48

M
Minghao Li 已提交
49 50 51 52 53
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
54 55 56 57 58

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
59
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
60 61 62

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
63 64 65 66 67
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
68
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
69

M
Minghao Li 已提交
70 71 72 73 74 75 76
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
77 78
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
79 80 81 82 83 84 85 86 87 88 89 90

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
91
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
92 93
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
94 95
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
M
Minghao Li 已提交
96
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
M
Minghao Li 已提交
97 98
  pSyncNode->FpPingTimer = syncNodePingTimerCb;
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
99

M
Minghao Li 已提交
100 101 102 103 104 105
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
115
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
116
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
117 118
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
119 120 121 122
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
123
    assert(ret == 0);
M
Minghao Li 已提交
124
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
125
  }
M
Minghao Li 已提交
126
}
M
Minghao Li 已提交
127

M
Minghao Li 已提交
128 129 130
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
131 132 133 134
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
135
    assert(ret == 0);
M
Minghao Li 已提交
136
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
137 138
  }
}
M
Minghao Li 已提交
139

M
Minghao Li 已提交
140
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
141 142
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
143
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
144
  assert(ret == 0);
M
Minghao Li 已提交
145
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
146
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
M
Minghao Li 已提交
151
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
M
Minghao Li 已提交
152
  } else {
M
Minghao Li 已提交
153
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
154 155 156
                 &pSyncNode->pPingTimer);
  }

M
Minghao Li 已提交
157
  atomic_store_8(&pSyncNode->pingTimerEnable, 1);
M
Minghao Li 已提交
158 159 160 161
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
  pSyncNode->pingTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pElectTimer == NULL) {
    pSyncNode->pElectTimer =
        taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
  }

  atomic_store_8(&pSyncNode->electTimerEnable, 1);
M
Minghao Li 已提交
177 178 179
  return 0;
}

M
Minghao Li 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->electTimerEnable, 0);
  pSyncNode->electTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pHeartbeatTimer == NULL) {
    pSyncNode->pHeartbeatTimer =
        taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pHeartbeatTimer);
  }

  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
  return 0;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
  pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}

void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}

M
Minghao Li 已提交
221
// ------ local funciton ---------
M
Minghao Li 已提交
222
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
223
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
224
  int32_t ret = 0;
M
Minghao Li 已提交
225

M
Minghao Li 已提交
226 227 228
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
242
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
243 244 245 246
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
247 248 249
  return ret;
}

M
Minghao Li 已提交
250
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
251
  int32_t ret = 0;
M
Minghao Li 已提交
252 253 254
  return ret;
}

M
Minghao Li 已提交
255
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
256
  int32_t ret = 0;
M
Minghao Li 已提交
257 258 259
  return ret;
}

M
Minghao Li 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
275
  int32_t ret = 0;
M
Minghao Li 已提交
276 277 278 279 280
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
281
    sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
M
Minghao Li 已提交
282 283 284 285
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
286 287 288 289 290
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
291 292 293
  return ret;
}

M
Minghao Li 已提交
294
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
295
  int32_t ret = 0;
M
Minghao Li 已提交
296 297 298 299 300
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
301
    sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
M
Minghao Li 已提交
302 303 304 305
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
306 307 308
  return ret;
}

M
Minghao Li 已提交
309
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
310
  int32_t ret = 0;
M
Minghao Li 已提交
311 312 313
  return ret;
}

M
Minghao Li 已提交
314
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
315
  int32_t ret = 0;
M
Minghao Li 已提交
316 317 318
  return ret;
}

M
Minghao Li 已提交
319
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
320
  int32_t ret = 0;
M
Minghao Li 已提交
321 322 323
  return ret;
}

M
Minghao Li 已提交
324
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
325
  int32_t ret = 0;
M
Minghao Li 已提交
326
  return ret;
M
Minghao Li 已提交
327 328 329 330
}

static void syncNodePingTimerCb(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
331
  if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
M
Minghao Li 已提交
332 333 334
    ++(pSyncNode->pingTimerCounter);
    // pSyncNode->pingTimerMS += 100;

M
Minghao Li 已提交
335 336 337 338 339 340
    sTrace(
        "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
        "tmrId:%p ",
        pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);

    syncNodePingAll(pSyncNode);
M
Minghao Li 已提交
341 342 343

    taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
M
Minghao Li 已提交
344
  } else {
M
Minghao Li 已提交
345
    sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
M
Minghao Li 已提交
346
  }
M
Minghao Li 已提交
347
}