syncIndexMgr.c 8.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncIndexMgr.h"
#include "syncUtil.h"

// SMatchIndex -----------------------------

SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
22
  SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
23 24 25 26
  if (pSyncIndexMgr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
M
Minghao Li 已提交
27 28 29 30 31 32 33 34 35 36
  memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));

  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);

  return pSyncIndexMgr;
}

M
Minghao Li 已提交
37 38 39 40 41 42 43
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);
}

M
Minghao Li 已提交
44 45
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
  if (pSyncIndexMgr != NULL) {
wafwerar's avatar
wafwerar 已提交
46
    taosMemoryFree(pSyncIndexMgr);
M
Minghao Li 已提交
47 48 49 50 51
  }
}

void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
  memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
M
Minghao Li 已提交
52
  memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
53 54 55 56 57 58 59

  // int64_t timeNow = taosGetMonotonicMs();
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->startTimeArr[i] = 0;
    pSyncIndexMgr->recvTimeArr[i] = 0;
  }

M
Minghao Li 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73
  /*
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->index[i] = 0;
  }
  */
}

void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->index)[i] = index;
      return;
    }
  }
74 75

  // maybe config change
M
Minghao Li 已提交
76
  // ASSERT(0);
77 78 79 80

  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
81 82
  sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         index);
M
Minghao Li 已提交
83 84 85
}

SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
M
Minghao Li 已提交
86 87 88 89
  if (pSyncIndexMgr == NULL) {
    return SYNC_INDEX_INVALID;
  }

M
Minghao Li 已提交
90 91 92 93 94 95
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncIndex idx = (pSyncIndexMgr->index)[i];
      return idx;
    }
  }
M
Minghao Li 已提交
96

M
Minghao Li 已提交
97
  return SYNC_INDEX_INVALID;
M
Minghao Li 已提交
98 99 100
}

cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
101
  char   u64buf[128] = {0};
M
Minghao Li 已提交
102 103
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
104 105 106 107 108 109 110
  if (pSyncIndexMgr != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
    }
M
Minghao Li 已提交
111 112 113 114 115 116 117 118 119

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->index[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "index", pIndex);
M
Minghao Li 已提交
120
    }
M
Minghao Li 已提交
121 122 123 124 125 126 127 128 129 130 131

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->privateTerm[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
    }

M
Minghao Li 已提交
132 133
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
134 135 136 137 138 139 140 141 142
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
  return pJson;
}

char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
  cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
M
Minghao Li 已提交
143
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
144 145
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
146 147
}

148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->startTimeArr)[i] = startTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         startTime);
}

int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t startTime = (pSyncIndexMgr->startTimeArr)[i];
      return startTime;
    }
  }
  ASSERT(0);
173
  return -1;
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
}

void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->recvTimeArr)[i] = recvTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         recvTime);
}

int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i];
      return recvTime;
    }
  }
  ASSERT(0);
201
  return -1;
202 203
}

M
Minghao Li 已提交
204 205 206
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
207
  printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized);
M
Minghao Li 已提交
208
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
209
  taosMemoryFree(serialized);
M
Minghao Li 已提交
210 211 212 213
}

void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
214
  printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
215
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
216
  taosMemoryFree(serialized);
M
Minghao Li 已提交
217 218 219 220
}

void syncIndexMgrLog(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
221
  sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
222
  taosMemoryFree(serialized);
M
Minghao Li 已提交
223 224 225
}

void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
M
Minghao Li 已提交
226 227
  if (gRaftDetailLog) {
    char *serialized = syncIndexMgr2Str(pObj);
228
    sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized);
M
Minghao Li 已提交
229 230
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
231 232 233 234 235 236 237 238 239 240 241
}

void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->privateTerm)[i] = term;
      return;
    }
  }

  // maybe config change
M
Minghao Li 已提交
242
  // ASSERT(0);
243 244 245
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
246
  sError("vgId:%d, index mgr set for %s:%d, term:%" PRIu64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, term);
M
Minghao Li 已提交
247 248 249 250 251 252 253 254 255
}

SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncTerm term = (pSyncIndexMgr->privateTerm)[i];
      return term;
    }
  }
M
Minghao Li 已提交
256
  ASSERT(0);
257
  return -1;
258
}