syncMain.c 5.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
M
Minghao Li 已提交
17
#include "sync.h"
M
Minghao Li 已提交
18
#include "syncEnv.h"
M
Minghao Li 已提交
19
#include "syncInt.h"
M
Minghao Li 已提交
20
#include "syncRaft.h"
M
Minghao Li 已提交
21

M
Minghao Li 已提交
22 23 24 25 26 27 28 29 30 31 32 33
static int32_t tsNodeRefId = -1;

// ------ local funciton ---------
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg);
static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg);
static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg);
static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg);
M
Minghao Li 已提交
34
static void    syncNodePingTimerCb(void* param, void* tmrId);
M
Minghao Li 已提交
35 36 37 38 39 40
// ---------------------------------

int32_t syncInit() {
  sTrace("syncInit ok");
  return 0;
}
M
Minghao Li 已提交
41

M
Minghao Li 已提交
42
void syncCleanUp() { sTrace("syncCleanUp ok"); }
M
Minghao Li 已提交
43

M
Minghao Li 已提交
44 45 46 47 48
int64_t syncStart(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
  assert(pSyncNode != NULL);
  return 0;
}
M
Minghao Li 已提交
49 50 51 52 53 54 55 56 57

void syncStop(int64_t rid) {}

int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }

int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }

ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }

M
Minghao Li 已提交
58 59 60 61 62
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}

SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
  SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
  assert(pSyncNode != NULL);
M
Minghao Li 已提交
63
  memset(pSyncNode, 0, sizeof(SSyncNode));
M
Minghao Li 已提交
64

M
Minghao Li 已提交
65 66 67 68 69
  pSyncNode->pPingTimer = NULL;
  pSyncNode->pingTimerMS = 1000;
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->FpPingTimer = syncNodePingTimerCb;
  pSyncNode->pingTimerCounter = 0;
M
Minghao Li 已提交
70

M
Minghao Li 已提交
71
  pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  pSyncNode->FpOnPing = onSyncNodePing;
  pSyncNode->FpOnPingReply = onSyncNodePingReply;
  pSyncNode->FpOnRequestVote = onSyncNodeRequestVote;
  pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply;
  pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries;
  pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply;

  return pSyncNode;
}

void syncNodeClose(SSyncNode* pSyncNode) {
  assert(pSyncNode != NULL);
  free(pSyncNode);
}

M
Minghao Li 已提交
87 88 89 90 91 92
void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); }

void syncNodePingPeers(SSyncNode* pSyncNode) {}

void syncNodePingSelf(SSyncNode* pSyncNode) {}

M
Minghao Li 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
  if (pSyncNode->pPingTimer == NULL) {
    pSyncNode->pPingTimer =
        taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
  } else {
    taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);
  }

  atomic_store_8(&pSyncNode->pingTimerStart, 1);
  return 0;
}

int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
  atomic_store_8(&pSyncNode->pingTimerStart, 0);
  pSyncNode->pingTimerCounter = TIMER_MAX_MS;
  return 0;
}

M
Minghao Li 已提交
112
// ------ local funciton ---------
M
Minghao Li 已提交
113
static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) {
M
Minghao Li 已提交
114
  int32_t ret = 0;
M
Minghao Li 已提交
115 116 117 118
  return ret;
}

static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) {
M
Minghao Li 已提交
119
  int32_t ret = 0;
M
Minghao Li 已提交
120 121 122 123
  return ret;
}

static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) {
M
Minghao Li 已提交
124
  int32_t ret = 0;
M
Minghao Li 已提交
125 126 127 128
  return ret;
}

static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) {
M
Minghao Li 已提交
129
  int32_t ret = 0;
M
Minghao Li 已提交
130 131 132 133
  return ret;
}

static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) {
M
Minghao Li 已提交
134
  int32_t ret = 0;
M
Minghao Li 已提交
135 136 137 138
  return ret;
}

static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) {
M
Minghao Li 已提交
139
  int32_t ret = 0;
M
Minghao Li 已提交
140 141 142 143
  return ret;
}

static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
144
  int32_t ret = 0;
M
Minghao Li 已提交
145 146 147 148
  return ret;
}

static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) {
M
Minghao Li 已提交
149
  int32_t ret = 0;
M
Minghao Li 已提交
150 151 152 153
  return ret;
}

static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
M
Minghao Li 已提交
154
  int32_t ret = 0;
M
Minghao Li 已提交
155
  return ret;
M
Minghao Li 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
}

static void syncNodePingTimerCb(void* param, void* tmrId) {
  SSyncNode* pSyncNode = (SSyncNode*)param;
  if (atomic_load_8(&pSyncNode->pingTimerStart)) {
    ++(pSyncNode->pingTimerCounter);
    // pSyncNode->pingTimerMS += 100;

    sTrace("pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, tmrId:%p ",
           pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);

    taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
                 &pSyncNode->pPingTimer);

    syncNodePingSelf(pSyncNode);
  }
M
Minghao Li 已提交
172
}