syncIndexMgr.c 6.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncIndexMgr.h"
#include "syncUtil.h"

// SMatchIndex -----------------------------

SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
wafwerar's avatar
wafwerar 已提交
22
  SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
M
Minghao Li 已提交
23
  ASSERT(pSyncIndexMgr != NULL);
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33
  memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));

  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);

  return pSyncIndexMgr;
}

M
Minghao Li 已提交
34 35 36 37 38 39 40
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
  pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
  pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
  pSyncIndexMgr->pSyncNode = pSyncNode;
  syncIndexMgrClear(pSyncIndexMgr);
}

M
Minghao Li 已提交
41 42
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
  if (pSyncIndexMgr != NULL) {
wafwerar's avatar
wafwerar 已提交
43
    taosMemoryFree(pSyncIndexMgr);
M
Minghao Li 已提交
44 45 46 47 48
  }
}

void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
  memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
M
Minghao Li 已提交
49
  memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
M
Minghao Li 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63
  /*
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    pSyncIndexMgr->index[i] = 0;
  }
  */
}

void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->index)[i] = index;
      return;
    }
  }
64 65

  // maybe config change
M
Minghao Li 已提交
66
  // ASSERT(0);
67 68 69 70 71

  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
  sError("vgId:%d index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index);
M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80
}

SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncIndex idx = (pSyncIndexMgr->index)[i];
      return idx;
    }
  }
M
Minghao Li 已提交
81

M
Minghao Li 已提交
82
  return SYNC_INDEX_INVALID;
M
Minghao Li 已提交
83 84 85
}

cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
86
  char   u64buf[128] = {0};
M
Minghao Li 已提交
87 88
  cJSON *pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
89 90 91 92 93 94 95
  if (pSyncIndexMgr != NULL) {
    cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
    cJSON *pReplicas = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
    for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
      cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
    }
M
Minghao Li 已提交
96 97 98 99 100 101 102 103 104

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->index[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "index", pIndex);
M
Minghao Li 已提交
105
    }
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114 115 116

    {
      int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
      for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
        arr[i] = pSyncIndexMgr->privateTerm[i];
      }
      cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
      taosMemoryFree(arr);
      cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
    }

M
Minghao Li 已提交
117 118
    snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
M
Minghao Li 已提交
119 120 121 122 123 124 125 126 127
  }

  cJSON *pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
  return pJson;
}

char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
  cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
128
  char * serialized = cJSON_Print(pJson);
M
Minghao Li 已提交
129 130
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
131 132 133 134 135
}

// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
136
  printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
M
Minghao Li 已提交
137
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
138
  taosMemoryFree(serialized);
M
Minghao Li 已提交
139 140 141 142
}

void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
143
  printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
144
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
145
  taosMemoryFree(serialized);
M
Minghao Li 已提交
146 147 148 149
}

void syncIndexMgrLog(SSyncIndexMgr *pObj) {
  char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
150
  sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
151
  taosMemoryFree(serialized);
M
Minghao Li 已提交
152 153 154
}

void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
M
Minghao Li 已提交
155 156
  if (gRaftDetailLog) {
    char *serialized = syncIndexMgr2Str(pObj);
S
Shengliang Guan 已提交
157
    sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
M
Minghao Li 已提交
158 159
    taosMemoryFree(serialized);
  }
M
Minghao Li 已提交
160 161 162 163 164 165 166 167 168 169 170
}

void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      (pSyncIndexMgr->privateTerm)[i] = term;
      return;
    }
  }

  // maybe config change
M
Minghao Li 已提交
171
  // ASSERT(0);
172 173 174
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
175
  sError("vgId:%d index mgr set for %s:%d, term:%" PRIu64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, term);
M
Minghao Li 已提交
176 177 178 179 180 181 182 183 184
}

SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
  for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
    if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
      SyncTerm term = (pSyncIndexMgr->privateTerm)[i];
      return term;
    }
  }
M
Minghao Li 已提交
185
  ASSERT(0);
M
Minghao Li 已提交
186
}