syncMain.c 11.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27 28
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);

M
Minghao Li 已提交
29 30 31 32
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);

M
Minghao Li 已提交
33 34 35
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
M
Minghao Li 已提交
36
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
M
Minghao Li 已提交
37 38 39 40 41 42 43

static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
M
Minghao Li 已提交
44 45 46 47 48 49
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
50

M
Minghao Li 已提交
51
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
52

M
Minghao Li 已提交
53 54 55 56 57
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
58 59 60 61 62

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
63
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
64 65 66

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
67 68 69 70 71
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
72
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
73

M
Minghao Li 已提交
74 75 76 77 78 79 80
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
81 82
  pSyncNode->queue = pSyncInfo->queue;
  pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91 92 93 94

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
95
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
96 97
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
98 99
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
M
Minghao Li 已提交
100
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
M
Minghao Li 已提交
101
  pSyncNode->FpPingTimer = syncNodeEqPingTimer;
M
Minghao Li 已提交
102
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104 105 106 107 108 109
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
110
  pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
M
Minghao Li 已提交
111 112 113 114 115 116 117 118 119

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
120
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
121
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
122 123
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
124 125 126 127
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
128
    assert(ret == 0);
M
Minghao Li 已提交
129
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
130
  }
M
Minghao Li 已提交
131
}
M
Minghao Li 已提交
132

M
Minghao Li 已提交
133 134 135
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
136 137 138 139
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
140
    assert(ret == 0);
M
Minghao Li 已提交
141
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
142 143
  }
}
M
Minghao Li 已提交
144

M
Minghao Li 已提交
145
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
146 147
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
148
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
149
  assert(ret == 0);
M
Minghao Li 已提交
150
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
151
}
M
Minghao Li 已提交
152

M
Minghao Li 已提交
153 154 155
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
M
Minghao Li 已提交
156
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
M
Minghao Li 已提交
157
  } else {
M
Minghao Li 已提交
158
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
M
Minghao Li 已提交
159 160 161
                 &pSyncNode->pPingTimer);
  }

M
Minghao Li 已提交
162
  atomic_store_8(&pSyncNode->pingTimerEnable, 1);
M
Minghao Li 已提交
163 164 165 166
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  atomic_store_8(&pSyncNode->pingTimerEnable, 0);
  pSyncNode->pingTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pElectTimer == NULL) {
    pSyncNode->pElectTimer =
        taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pElectTimer);
  }

  atomic_store_8(&pSyncNode->electTimerEnable, 1);
M
Minghao Li 已提交
182 183 184
  return 0;
}

M
Minghao Li 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->electTimerEnable, 0);
  pSyncNode->electTimerMS = TIMER_MAX_MS;
  return 0;
}

int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pHeartbeatTimer == NULL) {
    pSyncNode->pHeartbeatTimer =
        taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pHeartbeatTimer);
  }

  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
  return 0;
}

int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
  pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
210
// ------ local funciton ---------
M
Minghao Li 已提交
211
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
212
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
213
  int32_t ret = 0;
M
Minghao Li 已提交
214

M
Minghao Li 已提交
215 216 217
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
231
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
232 233 234 235
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
236 237 238
  return ret;
}

M
Minghao Li 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
254
  int32_t ret = 0;
M
Minghao Li 已提交
255 256 257 258 259
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
260
    sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
M
Minghao Li 已提交
261 262 263 264
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
265 266 267 268 269
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
270 271 272
  return ret;
}

M
Minghao Li 已提交
273
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
274
  int32_t ret = 0;
M
Minghao Li 已提交
275 276 277 278 279
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
280
    sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
M
Minghao Li 已提交
281 282 283 284
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
285 286 287
  return ret;
}

M
Minghao Li 已提交
288 289
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
  int32_t ret = 0;
M
Minghao Li 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
  sTrace("<-- syncNodeOnTimeoutCb -->");

  {
    cJSON* pJson = syncTimeout2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
    if (atomic_load_8(&ths->pingTimerEnable)) {
      ++(ths->pingTimerCounter);
      syncNodePingAll(ths);
    }

  } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
  } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
  } else {
  }

M
Minghao Li 已提交
311 312 313
  return ret;
}

M
Minghao Li 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
static void syncNodeEqPingTimer(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
    // pSyncNode->pingTimerMS += 100;

    SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
    SRpcMsg      rpcMsg;
    syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
    pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
    syncTimeoutDestroy(pSyncMsg);

    taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  } else {
    sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
  }
}

static void syncNodeEqElectTimer(void* param, void* tmrId) {}

M
Minghao Li 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}

static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
  if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
    pSyncNode->leaderCache.addr = 0;
    pSyncNode->leaderCache.vgId = 0;
  }

  syncNodeStopHeartbeatTimer(pSyncNode);
  syncNodeStartElectTimer(pSyncNode);
}

static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
  pSyncNode->state = TAOS_SYNC_STATE_LEADER;
  pSyncNode->leaderCache = pSyncNode->raftId;

  // next Index +=1
  // match Index = 0;

  syncNodeStopElectTimer(pSyncNode);
  syncNodeStartHeartbeatTimer(pSyncNode);

  // appendEntries;
}

static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
  assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
  pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}

static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}

static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}

static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}