syncMain.c 12.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27 28
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);

M
Minghao Li 已提交
29 30 31 32
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);

M
Minghao Li 已提交
33 34 35 36 37 38 39 40 41 42
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
43
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
M
Minghao Li 已提交
44 45 46 47 48 49
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
50

M
Minghao Li 已提交
51
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
52

M
Minghao Li 已提交
53 54 55 56 57
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
58 59 60 61 62

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
63
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
64 65 66

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
67 68 69 70 71
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
72
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75 76 77 78 79 80
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
81 82
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91 92 93 94

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
95
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
96 97
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
98 99
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
M
Minghao Li 已提交
100
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
M
Minghao Li 已提交
101
  pSyncNode->FpPingTimer = syncNodeEqPingTimer;
M
Minghao Li 已提交
102
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104 105 106 107 108 109
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
110
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
111 112 113 114 115 116 117 118 119

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
120
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
121
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
122 123
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
124 125 126 127
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
128
    assert(ret == 0);
M
Minghao Li 已提交
129
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
130
  }
M
Minghao Li 已提交
131
}
M
Minghao Li 已提交
132

M
Minghao Li 已提交
133 134 135
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
136 137 138 139
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
140
    assert(ret == 0);
M
Minghao Li 已提交
141
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
142 143
  }
}
M
Minghao Li 已提交
144

M
Minghao Li 已提交
145
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
146 147
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
148
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
149
  assert(ret == 0);
M
Minghao Li 已提交
150
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
151
}
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153 154 155
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
M
Minghao Li 已提交
156
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
M
Minghao Li 已提交
157
  } else {
M
Minghao Li 已提交
158
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
159 160 161
                 &pSyncNode->pPingTimer);
  }

M
Minghao Li 已提交
162
  atomic_store_8(&pSyncNode->pingTimerEnable, 1);
M
Minghao Li 已提交
163 164 165 166
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
  pSyncNode->pingTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pElectTimer == NULL) {
    pSyncNode->pElectTimer =
        taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
  }

  atomic_store_8(&pSyncNode->electTimerEnable, 1);
M
Minghao Li 已提交
182 183 184
  return 0;
}

M
Minghao Li 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->electTimerEnable, 0);
  pSyncNode->electTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pHeartbeatTimer == NULL) {
    pSyncNode->pHeartbeatTimer =
        taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pHeartbeatTimer);
  }

  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
  return 0;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
  pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }

void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}

void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}

void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}

void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}

void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}

M
Minghao Li 已提交
226
// ------ local funciton ---------
M
Minghao Li 已提交
227
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
228
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
229
  int32_t ret = 0;
M
Minghao Li 已提交
230

M
Minghao Li 已提交
231 232 233
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
247
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
248 249 250 251
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
252 253 254
  return ret;
}

M
Minghao Li 已提交
255
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
256
  int32_t ret = 0;
M
Minghao Li 已提交
257 258 259
  return ret;
}

M
Minghao Li 已提交
260
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
261
  int32_t ret = 0;
M
Minghao Li 已提交
262 263 264
  return ret;
}

M
Minghao Li 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
280
  int32_t ret = 0;
M
Minghao Li 已提交
281 282 283 284 285
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
286
    sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
M
Minghao Li 已提交
287 288 289 290
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
291 292 293 294 295
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
296 297 298
  return ret;
}

M
Minghao Li 已提交
299
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
300
  int32_t ret = 0;
M
Minghao Li 已提交
301 302 303 304 305
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
306
    sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
M
Minghao Li 已提交
307 308 309 310
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
311 312 313
  return ret;
}

M
Minghao Li 已提交
314
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
315
  int32_t ret = 0;
M
Minghao Li 已提交
316 317 318
  return ret;
}

M
Minghao Li 已提交
319
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
320
  int32_t ret = 0;
M
Minghao Li 已提交
321 322 323
  return ret;
}

M
Minghao Li 已提交
324
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
325
  int32_t ret = 0;
M
Minghao Li 已提交
326 327 328
  return ret;
}

M
Minghao Li 已提交
329
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
330
  int32_t ret = 0;
M
Minghao Li 已提交
331
  return ret;
M
Minghao Li 已提交
332 333
}

M
Minghao Li 已提交
334 335
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
  sTrace("<-- syncNodeOnTimeoutCb -->");

  {
    cJSON* pJson = syncTimeout2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
    if (atomic_load_8(&ths->pingTimerEnable)) {
      ++(ths->pingTimerCounter);
      syncNodePingAll(ths);
    }

  } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
  } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
  } else {
  }

M
Minghao Li 已提交
357 358 359
  return ret;
}

M
Minghao Li 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
    // pSyncNode->pingTimerMS += 100;

    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  } else {
    sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {}

static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}