syncIndexMgr.c 8.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncIndexMgr.h"
#include "syncUtil.h"

// SMatchIndex -----------------------------

SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
22
  SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
M
Minghao Li 已提交
23
  ASSERT(pSyncIndexMgr != NULL);
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33
  memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));

  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);

  return pSyncIndexMgr;
}

M
Minghao Li 已提交
34 35 36 37 38 39 40
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);
}

M
Minghao Li 已提交
41 42
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
  if (pSyncIndexMgr != NULL) {
wafwerar's avatar
wafwerar 已提交
43
    taosMemoryFree(pSyncIndexMgr);
M
Minghao Li 已提交
44 45 46 47 48
  }
}

void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
  memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
M
Minghao Li 已提交
49
  memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
50 51 52 53 54 55 56

  // int64_t timeNow = taosGetMonotonicMs();
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->startTimeArr[i] = 0;
    pSyncIndexMgr->recvTimeArr[i] = 0;
  }

M
Minghao Li 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70
  /*
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->index[i] = 0;
  }
  */
}

void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->index)[i] = index;
      return;
    }
  }
71 72

  // maybe config change
M
Minghao Li 已提交
73
  // ASSERT(0);
74 75 76 77

  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
78 79
  sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         index);
M
Minghao Li 已提交
80 81 82 83 84 85 86 87 88
}

SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncIndex idx = (pSyncIndexMgr->index)[i];
      return idx;
    }
  }
M
Minghao Li 已提交
89

M
Minghao Li 已提交
90
  return SYNC_INDEX_INVALID;
M
Minghao Li 已提交
91 92 93
}

cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
94
  char   u64buf[128] = {0};
M
Minghao Li 已提交
95 96
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
97 98 99 100 101 102 103
  if (pSyncIndexMgr != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
    }
M
Minghao Li 已提交
104 105 106 107 108 109 110 111 112

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->index[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "index", pIndex);
M
Minghao Li 已提交
113
    }
M
Minghao Li 已提交
114 115 116 117 118 119 120 121 122 123 124

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->privateTerm[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
    }

M
Minghao Li 已提交
125 126
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
127 128 129 130 131 132 133 134 135
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
  return pJson;
}

char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
  cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
136
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
137 138
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
139 140
}

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->startTimeArr)[i] = startTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         startTime);
}

int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t startTime = (pSyncIndexMgr->startTimeArr)[i];
      return startTime;
    }
  }
  ASSERT(0);
166
  return -1;
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
}

void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->recvTimeArr)[i] = recvTime;
      return;
    }
  }

  // maybe config change
  // ASSERT(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
         recvTime);
}

int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i];
      return recvTime;
    }
  }
  ASSERT(0);
194
  return -1;
195 196
}

M
Minghao Li 已提交
197 198 199
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
200
  printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
M
Minghao Li 已提交
201
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
202
  taosMemoryFree(serialized);
M
Minghao Li 已提交
203 204 205 206
}

void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
207
  printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
208
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
209
  taosMemoryFree(serialized);
M
Minghao Li 已提交
210 211 212 213
}

void syncIndexMgrLog(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
214
  sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
215
  taosMemoryFree(serialized);
M
Minghao Li 已提交
216 217 218
}

void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
M
Minghao Li 已提交
219 220
  if (gRaftDetailLog) {
    char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
221
    sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
M
Minghao Li 已提交
222 223
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
224 225 226 227 228 229 230 231 232 233 234
}

void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->privateTerm)[i] = term;
      return;
    }
  }

  // maybe config change
M
Minghao Li 已提交
235
  // ASSERT(0);
236 237 238
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
239
  sError("vgId:%d, index mgr set for %s:%d, term:%" PRIu64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, term);
M
Minghao Li 已提交
240 241 242 243 244 245 246 247 248
}

SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncTerm term = (pSyncIndexMgr->privateTerm)[i];
      return term;
    }
  }
M
Minghao Li 已提交
249
  ASSERT(0);
250
  return -1;
M
Minghao Li 已提交
251
}