syncMain.c 9.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21
#include "syncUtil.h"
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
M
Minghao Li 已提交
26 27
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
M
Minghao Li 已提交
28
static void    syncNodePingTimerCb(void* param, void* tmrId);
M
Minghao Li 已提交
29 30 31 32 33 34 35 36 37 38 39

static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
40 41 42 43 44 45
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
46

M
Minghao Li 已提交
47
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
48

M
Minghao Li 已提交
49 50 51 52 53
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
54 55 56 57 58

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

M
Minghao Li 已提交
59 60 61
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }

int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
M
Minghao Li 已提交
62 63 64

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
65 66 67 68 69
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
70
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
71

M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  pSyncNode->vgId = pSyncInfo->vgId;
  pSyncNode->syncCfg = pSyncInfo->syncCfg;
  memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
  pSyncNode->pFsm = pSyncInfo->pFsm;

  pSyncNode->rpcClient = pSyncInfo->rpcClient;
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;

  pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
  pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;

  int j = 0;
  for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
    if (i != pSyncInfo->syncCfg.myIndex) {
      pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
      j++;
    }
  }

M
syncInt  
Minghao Li 已提交
91
  pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
M
Minghao Li 已提交
92 93
  syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);

M
Minghao Li 已提交
94 95 96 97 98
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->FpPingTimer = syncNodePingTimerCb;
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
99

M
Minghao Li 已提交
100 101 102 103 104 105
  pSyncNode->FpOnPing = syncNodeOnPingCb;
  pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
  pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
  pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
  pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
  pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
115
void syncNodePingAll(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
116
  sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
117 118
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
M
Minghao Li 已提交
119 120 121 122
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
123
    assert(ret == 0);
M
Minghao Li 已提交
124
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
125
  }
M
Minghao Li 已提交
126
}
M
Minghao Li 已提交
127

M
Minghao Li 已提交
128 129 130
void syncNodePingPeers(SSyncNode* pSyncNode) {
  int32_t ret = 0;
  for (int i = 0; i < pSyncNode->peersNum; ++i) {
M
Minghao Li 已提交
131 132 133 134
    SRaftId destId;
    syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
    SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
    ret = syncNodePing(pSyncNode, &destId, pMsg);
M
Minghao Li 已提交
135
    assert(ret == 0);
M
Minghao Li 已提交
136
    syncPingDestroy(pMsg);
M
Minghao Li 已提交
137 138
  }
}
M
Minghao Li 已提交
139

M
Minghao Li 已提交
140
void syncNodePingSelf(SSyncNode* pSyncNode) {
M
Minghao Li 已提交
141 142
  int32_t   ret;
  SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
M
Minghao Li 已提交
143
  ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
M
Minghao Li 已提交
144
  assert(ret == 0);
M
Minghao Li 已提交
145
  syncPingDestroy(pMsg);
M
Minghao Li 已提交
146
}
M
Minghao Li 已提交
147

M
Minghao Li 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  }

  atomic_store_8(&pSyncNode->pingTimerStart, 1);
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->pingTimerCounter = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
167
// ------ local funciton ---------
M
Minghao Li 已提交
168
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
M
Minghao Li 已提交
169
  sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
170
  int32_t ret = 0;
M
Minghao Li 已提交
171

M
Minghao Li 已提交
172 173 174
  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
  syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
M
Minghao Li 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

  {
    SyncPing* pMsg2 = rpcMsg.pCont;
    cJSON*    pJson = syncPing2Json(pMsg2);
    char*     serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
188
    sTrace("syncNodePing rpcMsg.pCont:%s ", serialized);
M
Minghao Li 已提交
189 190 191 192
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
193 194 195
  return ret;
}

M
Minghao Li 已提交
196
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
197
  int32_t ret = 0;
M
Minghao Li 已提交
198 199 200
  return ret;
}

M
Minghao Li 已提交
201
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
202
  int32_t ret = 0;
M
Minghao Li 已提交
203 204 205
  return ret;
}

M
Minghao Li 已提交
206
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
M
Minghao Li 已提交
207
  sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
M
Minghao Li 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221
  SEpSet epSet;
  syncUtilraftId2EpSet(destRaftId, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
  SEpSet epSet;
  syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
  pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
  return 0;
}

static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
222
  int32_t ret = 0;
M
Minghao Li 已提交
223 224 225 226 227 228 229 230 231 232
  sTrace("<-- syncNodeOnPingCb -->");

  {
    cJSON* pJson = syncPing2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
233 234 235 236 237
  SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
  SRpcMsg        rpcMsg;
  syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
  syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);

M
Minghao Li 已提交
238 239 240
  return ret;
}

M
Minghao Li 已提交
241
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
242
  int32_t ret = 0;
M
Minghao Li 已提交
243 244 245 246 247 248 249 250 251 252
  sTrace("<-- syncNodeOnPingReplyCb -->");

  {
    cJSON* pJson = syncPingReply2Json(pMsg);
    char*  serialized = cJSON_Print(pJson);
    sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized);
    free(serialized);
    cJSON_Delete(pJson);
  }

M
Minghao Li 已提交
253 254 255
  return ret;
}

M
Minghao Li 已提交
256
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
257
  int32_t ret = 0;
M
Minghao Li 已提交
258 259 260
  return ret;
}

M
Minghao Li 已提交
261
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
262
  int32_t ret = 0;
M
Minghao Li 已提交
263 264 265
  return ret;
}

M
Minghao Li 已提交
266
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
267
  int32_t ret = 0;
M
Minghao Li 已提交
268 269 270
  return ret;
}

M
Minghao Li 已提交
271
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
272
  int32_t ret = 0;
M
Minghao Li 已提交
273
  return ret;
M
Minghao Li 已提交
274 275 276 277 278 279 280 281
}

static void syncNodePingTimerCb(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_8(&pSyncNode->pingTimerStart)) {
    ++(pSyncNode->pingTimerCounter);
    // pSyncNode->pingTimerMS += 100;

M
Minghao Li 已提交
282 283 284 285 286 287
    sTrace(
        "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
        "tmrId:%p ",
        pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);

    syncNodePingAll(pSyncNode);
M
Minghao Li 已提交
288 289 290

    taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
M
Minghao Li 已提交
291 292
  } else {
    sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
M
Minghao Li 已提交
293
  }
M
Minghao Li 已提交
294
}