syncIndexMgr.c 6.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncIndexMgr.h"
#include "syncUtil.h"

// SMatchIndex -----------------------------

SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
22
  SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
M
Minghao Li 已提交
23 24 25 26 27 28 29 30 31 32 33
  assert(pSyncIndexMgr != NULL);
  memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));

  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);

  return pSyncIndexMgr;
}

M
Minghao Li 已提交
34 35 36 37 38 39 40
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);
}

M
Minghao Li 已提交
41 42
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
  if (pSyncIndexMgr != NULL) {
wafwerar's avatar
wafwerar 已提交
43
    taosMemoryFree(pSyncIndexMgr);
M
Minghao Li 已提交
44 45 46 47 48
  }
}

void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
  memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
M
Minghao Li 已提交
49
  memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
M
Minghao Li 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
  /*
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->index[i] = 0;
  }
  */
}

void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->index)[i] = index;
      return;
    }
  }
64 65

  // maybe config change
66 67 68 69 70 71
  // assert(0);

  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index);
M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84
}

SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncIndex idx = (pSyncIndexMgr->index)[i];
      return idx;
    }
  }
  assert(0);
}

cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
85
  char   u64buf[128] = {0};
M
Minghao Li 已提交
86 87
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
88 89 90 91 92 93 94
  if (pSyncIndexMgr != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
    }
M
Minghao Li 已提交
95 96 97 98 99 100 101 102 103

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->index[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "index", pIndex);
M
Minghao Li 已提交
104
    }
M
Minghao Li 已提交
105 106 107 108 109 110 111 112 113 114 115

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->privateTerm[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
    }

M
Minghao Li 已提交
116 117
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
118 119 120 121 122 123 124 125 126
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
  return pJson;
}

char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
  cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
127
  char  *serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
128 129
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
130 131 132 133 134 135 136
}

// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
  printf("syncIndexMgrPrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
137
  taosMemoryFree(serialized);
M
Minghao Li 已提交
138 139 140 141 142 143
}

void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
  printf("syncIndexMgrPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
144
  taosMemoryFree(serialized);
M
Minghao Li 已提交
145 146 147 148 149
}

void syncIndexMgrLog(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
  sTrace("syncIndexMgrLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
150
  taosMemoryFree(serialized);
M
Minghao Li 已提交
151 152 153
}

void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
M
Minghao Li 已提交
154 155 156 157 158
  if (gRaftDetailLog) {
    char *serialized = syncIndexMgr2Str(pObj);
    sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
159 160 161 162 163 164 165 166 167 168 169
}

void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->privateTerm)[i] = term;
      return;
    }
  }

  // maybe config change
170 171 172 173 174
  // assert(0);
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d index mgr set for %s:%d, term:%lu error", pSyncIndexMgr->pSyncNode->vgId, host, port, term);
M
Minghao Li 已提交
175 176 177 178 179 180 181 182 183 184
}

SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncTerm term = (pSyncIndexMgr->privateTerm)[i];
      return term;
    }
  }
  assert(0);
M
Minghao Li 已提交
185
}