syncMain.c 11.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
M
Minghao Li 已提交
28
static void    syncNodePingTimerCb(void* param, void* tmrId);
M
Minghao Li 已提交
29 30 31 32 33 34 35 36 37 38 39

static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
40
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
M
Minghao Li 已提交
41 42 43 44 45 46
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
47

M
Minghao Li 已提交
48
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
49

M
Minghao Li 已提交
50 51 52 53 54
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
55 56 57 58 59

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
60
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
61 62 63

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
64 65 66 67 68
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
69
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71 72 73 74 75 76 77
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
78 79
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88 89 90 91

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
92
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
93 94
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
95 96
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
M
Minghao Li 已提交
97
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
M
Minghao Li 已提交
98 99
  pSyncNode->FpPingTimer = syncNodePingTimerCb;
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
100

M
Minghao Li 已提交
101 102 103 104 105 106
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
107 108 109 110 111 112 113 114 115

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
116
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
117
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
118 119
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
120 121 122 123
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
124
    assert(ret == 0);
M
Minghao Li 已提交
125
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
126
  }
M
Minghao Li 已提交
127
}
M
Minghao Li 已提交
128

M
Minghao Li 已提交
129 130 131
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
132 133 134 135
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
136
    assert(ret == 0);
M
Minghao Li 已提交
137
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
138 139
  }
}
M
Minghao Li 已提交
140

M
Minghao Li 已提交
141
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
142 143
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
144
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
145
  assert(ret == 0);
M
Minghao Li 已提交
146
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
147
}
M
Minghao Li 已提交
148

M
Minghao Li 已提交
149 150 151
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
M
Minghao Li 已提交
152
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
M
Minghao Li 已提交
153
  } else {
M
Minghao Li 已提交
154
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
155 156 157
                 &pSyncNode->pPingTimer);
  }

M
Minghao Li 已提交
158
  atomic_store_8(&pSyncNode->pingTimerEnable, 1);
M
Minghao Li 已提交
159 160 161 162
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
  pSyncNode->pingTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pElectTimer == NULL) {
    pSyncNode->pElectTimer =
        taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
  }

  atomic_store_8(&pSyncNode->electTimerEnable, 1);
M
Minghao Li 已提交
178 179 180
  return 0;
}

M
Minghao Li 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->electTimerEnable, 0);
  pSyncNode->electTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pHeartbeatTimer == NULL) {
    pSyncNode->pHeartbeatTimer =
        taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pHeartbeatTimer);
  }

  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
  return 0;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
  pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}

void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}

M
Minghao Li 已提交
222
// ------ local funciton ---------
M
Minghao Li 已提交
223
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
224
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
225
  int32_t ret = 0;
M
Minghao Li 已提交
226

M
Minghao Li 已提交
227 228 229
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
243
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
244 245 246 247
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
248 249 250
  return ret;
}

M
Minghao Li 已提交
251
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
252
  int32_t ret = 0;
M
Minghao Li 已提交
253 254 255
  return ret;
}

M
Minghao Li 已提交
256
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
257
  int32_t ret = 0;
M
Minghao Li 已提交
258 259 260
  return ret;
}

M
Minghao Li 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
276
  int32_t ret = 0;
M
Minghao Li 已提交
277 278 279 280 281
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
282
    sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
M
Minghao Li 已提交
283 284 285 286
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
287 288 289 290 291
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
292 293 294
  return ret;
}

M
Minghao Li 已提交
295
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
296
  int32_t ret = 0;
M
Minghao Li 已提交
297 298 299 300 301
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
302
    sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
M
Minghao Li 已提交
303 304 305 306
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
307 308 309
  return ret;
}

M
Minghao Li 已提交
310
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
311
  int32_t ret = 0;
M
Minghao Li 已提交
312 313 314
  return ret;
}

M
Minghao Li 已提交
315
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
316
  int32_t ret = 0;
M
Minghao Li 已提交
317 318 319
  return ret;
}

M
Minghao Li 已提交
320
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
321
  int32_t ret = 0;
M
Minghao Li 已提交
322 323 324
  return ret;
}

M
Minghao Li 已提交
325
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
326
  int32_t ret = 0;
M
Minghao Li 已提交
327
  return ret;
M
Minghao Li 已提交
328 329
}

M
Minghao Li 已提交
330 331 332 333 334
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
  int32_t ret = 0;
  return ret;
}

M
Minghao Li 已提交
335 336
static void syncNodePingTimerCb(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
M
Minghao Li 已提交
337
  if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
M
Minghao Li 已提交
338 339 340
    ++(pSyncNode->pingTimerCounter);
    // pSyncNode->pingTimerMS += 100;

M
Minghao Li 已提交
341 342 343 344 345 346
    sTrace(
        "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
        "tmrId:%p ",
        pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);

    syncNodePingAll(pSyncNode);
M
Minghao Li 已提交
347 348 349

    taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
M
Minghao Li 已提交
350
  } else {
M
Minghao Li 已提交
351
    sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
M
Minghao Li 已提交
352
  }
M
Minghao Li 已提交
353
}