syncIndexMgr.c 5.9 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17 18 19 20
#include "syncIndexMgr.h"
#include "syncUtil.h"

SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
21
  SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr));
22 23 24 25
  if (pSyncIndexMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
M
Minghao Li 已提交
26 27 28 29 30 31 32 33 34

  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);

  return pSyncIndexMgr;
}

M
Minghao Li 已提交
35 36 37 38 39 40 41
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);
}

M
Minghao Li 已提交
42 43
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
  if (pSyncIndexMgr != NULL) {
wafwerar's avatar
wafwerar 已提交
44
    taosMemoryFree(pSyncIndexMgr);
M
Minghao Li 已提交
45 46 47 48 49
  }
}

void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
  memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
M
Minghao Li 已提交
50
  memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
51 52

  // int64_t timeNow = taosGetMonotonicMs();
53
  int64_t timeNow = taosGetTimestampMs();
54 55
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->startTimeArr[i] = 0;
56
    pSyncIndexMgr->recvTimeArr[i] = timeNow;
57 58
  }

M
Minghao Li 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72
  /*
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->index[i] = 0;
  }
  */
}

void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->index)[i] = index;
      return;
    }
  }
73 74

  // maybe config change
M
Minghao Li 已提交
75
  // ASSERT(0);
76 77 78 79

  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
80 81
  sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         index);
M
Minghao Li 已提交
82 83
}

B
Benguang Zhao 已提交
84 85 86 87 88 89 90 91 92
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pDestId) {
  for (int i = 0; i < pNode->replicaNum; i++) {
    if (syncUtilSameId(&(pNode->replicasId[i]), pDestId)) {
      return pNode->logReplMgrs[i];
    }
  }
  return NULL;
}

M
Minghao Li 已提交
93
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
M
Minghao Li 已提交
94 95 96 97
  if (pSyncIndexMgr == NULL) {
    return SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
98 99 100 101 102 103
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncIndex idx = (pSyncIndexMgr->index)[i];
      return idx;
    }
  }
M
Minghao Li 已提交
104

M
Minghao Li 已提交
105
  return SYNC_INDEX_INVALID;
M
Minghao Li 已提交
106 107
}

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->startTimeArr)[i] = startTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         startTime);
}

int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t startTime = (pSyncIndexMgr->startTimeArr)[i];
      return startTime;
    }
  }
  ASSERT(0);
133
  return -1;
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
}

void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->recvTimeArr)[i] = recvTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         recvTime);
}

int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i];
      return recvTime;
    }
  }
160

161
  return -1;
162 163
}

M
Minghao Li 已提交
164 165 166 167 168 169 170 171 172
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->privateTerm)[i] = term;
      return;
    }
  }

  // maybe config change
M
Minghao Li 已提交
173
  // ASSERT(0);
174 175 176
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
177
  sError("vgId:%d, index mgr set for %s:%d, term:%" PRIu64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, term);
M
Minghao Li 已提交
178 179 180 181 182 183 184 185 186
}

SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncTerm term = (pSyncIndexMgr->privateTerm)[i];
      return term;
    }
  }
M
Minghao Li 已提交
187
  ASSERT(0);
188
  return -1;
189
}