dmEps.c 9.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static void dmPrintEps(SDnode *pDnode);
static bool dmIsEpChanged(SDnode *pDnode, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnode *pDnode, SArray *dnodeEps);
S
shm  
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23 24
static void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
  taosRLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  SDnodeEp *pDnodeEp = taosHashGet(pDnode->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->ep.port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
    }
  }

S
Shengliang Guan 已提交
39
  taosRUnLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
40 41
}

S
Shengliang Guan 已提交
42
int32_t dmReadEps(SDnode *pDnode) {
43
  int32_t   code = TSDB_CODE_INVALID_JSON_FORMAT;
S
shm  
Shengliang Guan 已提交
44 45
  int32_t   len = 0;
  int32_t   maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
46
  char     *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
47
  cJSON    *root = NULL;
S
Shengliang Guan 已提交
48
  char      file[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
49
  TdFilePtr pFile = NULL;
S
shm  
Shengliang Guan 已提交
50

S
Shengliang Guan 已提交
51 52
  pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
  if (pDnode->data.dnodeEps == NULL) {
S
shm  
Shengliang Guan 已提交
53 54 55 56
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
    goto PRASE_DNODE_OVER;
  }

S
Shengliang Guan 已提交
57
  snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
58
  pFile = taosOpenFile(file, TD_FILE_READ);
S
shm  
Shengliang Guan 已提交
59
  if (pFile == NULL) {
S
Shengliang Guan 已提交
60
    // dDebug("file %s not exist", file);
S
shm  
Shengliang Guan 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    code = 0;
    goto PRASE_DNODE_OVER;
  }

  len = (int32_t)taosReadFile(pFile, content, maxLen);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_DNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_DNODE_OVER;
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_Number) {
    dError("failed to read %s since dnodeId not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
83
  pDnode->data.dnodeId = dnodeId->valueint;
S
shm  
Shengliang Guan 已提交
84 85 86 87 88 89

  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
    dError("failed to read %s since clusterId not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
90
  pDnode->data.clusterId = atoll(clusterId->valuestring);
S
shm  
Shengliang Guan 已提交
91 92 93 94 95 96

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_Number) {
    dError("failed to read %s since dropped not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
97
  pDnode->data.dropped = dropped->valueint;
S
shm  
Shengliang Guan 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146

  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", file);
    goto PRASE_DNODE_OVER;
  }

  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
    goto PRASE_DNODE_OVER;
  }

  for (int32_t i = 0; i < numOfDnodes; ++i) {
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;

    SDnodeEp dnodeEp = {0};

    cJSON *did = cJSON_GetObjectItem(node, "id");
    if (!did || did->type != cJSON_Number) {
      dError("failed to read %s since dnodeId not found", file);
      goto PRASE_DNODE_OVER;
    }

    dnodeEp.id = dnodeId->valueint;

    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
      dError("failed to read %s since dnodeFqdn not found", file);
      goto PRASE_DNODE_OVER;
    }
    tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
      dError("failed to read %s since dnodePort not found", file);
      goto PRASE_DNODE_OVER;
    }

    dnodeEp.ep.port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
      dError("failed to read %s since isMnode not found", file);
      goto PRASE_DNODE_OVER;
    }
    dnodeEp.isMnode = isMnode->valueint;

S
Shengliang Guan 已提交
147
    taosArrayPush(pDnode->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
148 149 150
  }

  code = 0;
S
shm  
Shengliang Guan 已提交
151
  dDebug("succcessed to read file %s", file);
S
Shengliang Guan 已提交
152
  dmPrintEps(pDnode);
S
shm  
Shengliang Guan 已提交
153 154

PRASE_DNODE_OVER:
wafwerar's avatar
wafwerar 已提交
155
  if (content != NULL) taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
156 157 158
  if (root != NULL) cJSON_Delete(root);
  if (pFile != NULL) taosCloseFile(&pFile);

S
Shengliang Guan 已提交
159
  if (dmIsEpChanged(pDnode, pDnode->data.dnodeId, pDnode->data.localEp)) {
S
Shengliang Guan 已提交
160
    dError("localEp %s different with %s and need reconfigured", pDnode->data.localEp, file);
S
shm  
Shengliang Guan 已提交
161 162 163
    return -1;
  }

S
Shengliang Guan 已提交
164
  if (taosArrayGetSize(pDnode->data.dnodeEps) == 0) {
S
shm  
Shengliang Guan 已提交
165 166
    SDnodeEp dnodeEp = {0};
    dnodeEp.isMnode = 1;
S
Shengliang Guan 已提交
167
    taosGetFqdnPortFromEp(pDnode->data.firstEp, &dnodeEp.ep);
S
Shengliang Guan 已提交
168
    taosArrayPush(pDnode->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
169 170
  }

S
Shengliang Guan 已提交
171
  dmResetEps(pDnode, pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
172

S
shm  
Shengliang Guan 已提交
173 174
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177
int32_t dmWriteEps(SDnode *pDnode) {
S
Shengliang Guan 已提交
178 179 180 181
  char file[PATH_MAX] = {0};
  char realfile[PATH_MAX];
  snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP);
  snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
182

183
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
shm  
Shengliang Guan 已提交
184 185 186 187 188 189 190 191
  if (pFile == NULL) {
    dError("failed to write %s since %s", file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
192
  char   *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
193 194

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
195 196 197
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pDnode->data.dnodeId);
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pDnode->data.clusterId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pDnode->data.dropped);
S
shm  
Shengliang Guan 已提交
198 199
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");

S
Shengliang Guan 已提交
200
  int32_t numOfEps = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
201
  for (int32_t i = 0; i < numOfEps; ++i) {
S
Shengliang Guan 已提交
202
    SDnodeEp *pDnodeEp = taosArrayGet(pDnode->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->ep.port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
    if (i < numOfEps - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
218
  taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
219 220

  if (taosRenameFile(file, realfile) != 0) {
S
shm  
Shengliang Guan 已提交
221
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
222 223 224 225
    dError("failed to rename %s since %s", file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
226
  pDnode->data.updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
227
  dDebug("successed to write %s", realfile);
S
shm  
Shengliang Guan 已提交
228 229 230
  return 0;
}

S
Shengliang Guan 已提交
231 232
void dmUpdateEps(SDnode *pDnode, SArray *eps) {
  int32_t numOfEps = taosArrayGetSize(eps);
S
shm  
Shengliang Guan 已提交
233 234
  if (numOfEps <= 0) return;

S
Shengliang Guan 已提交
235
  taosWLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237
  int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
238
  if (numOfEps != numOfEpsOld) {
S
Shengliang Guan 已提交
239 240
    dmResetEps(pDnode, eps);
    dmWriteEps(pDnode);
S
shm  
Shengliang Guan 已提交
241 242
  } else {
    int32_t size = numOfEps * sizeof(SDnodeEp);
S
Shengliang Guan 已提交
243 244 245
    if (memcmp(pDnode->data.dnodeEps->pData, eps->pData, size) != 0) {
      dmResetEps(pDnode, eps);
      dmWriteEps(pDnode);
S
shm  
Shengliang Guan 已提交
246 247 248
    }
  }

S
Shengliang Guan 已提交
249
  taosWUnLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
250 251
}

S
Shengliang Guan 已提交
252 253 254 255
static void dmResetEps(SDnode *pDnode, SArray *dnodeEps) {
  if (pDnode->data.dnodeEps != dnodeEps) {
    SArray *tmp = pDnode->data.dnodeEps;
    pDnode->data.dnodeEps = taosArrayDup(dnodeEps);
S
shm  
Shengliang Guan 已提交
256 257 258
    taosArrayDestroy(tmp);
  }

S
Shengliang Guan 已提交
259 260
  pDnode->data.mnodeEps.inUse = 0;
  pDnode->data.mnodeEps.numOfEps = 0;
S
shm  
Shengliang Guan 已提交
261 262

  int32_t mIndex = 0;
S
shm  
Shengliang Guan 已提交
263
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
S
shm  
Shengliang Guan 已提交
264 265

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
266
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
shm  
Shengliang Guan 已提交
267 268
    if (!pDnodeEp->isMnode) continue;
    if (mIndex >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
269
    pDnode->data.mnodeEps.numOfEps++;
S
shm  
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271
    pDnode->data.mnodeEps.eps[mIndex] = pDnodeEp->ep;
S
shm  
Shengliang Guan 已提交
272 273 274 275
    mIndex++;
  }

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
276
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
Shengliang Guan 已提交
277
    taosHashPut(pDnode->data.dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
shm  
Shengliang Guan 已提交
278 279
  }

S
Shengliang Guan 已提交
280
  dmPrintEps(pDnode);
S
shm  
Shengliang Guan 已提交
281 282
}

S
Shengliang Guan 已提交
283 284
static void dmPrintEps(SDnode *pDnode) {
  int32_t numOfEps = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
285 286
  dDebug("print dnode ep list, num:%d", numOfEps);
  for (int32_t i = 0; i < numOfEps; i++) {
S
Shengliang Guan 已提交
287
    SDnodeEp *pEp = taosArrayGet(pDnode->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
288 289 290 291
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
  }
}

S
Shengliang Guan 已提交
292
static bool dmIsEpChanged(SDnode *pDnode, int32_t dnodeId, const char *ep) {
S
shm  
Shengliang Guan 已提交
293
  bool changed = false;
S
Shengliang Guan 已提交
294
  taosRLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
295

S
Shengliang Guan 已提交
296
  SDnodeEp *pDnodeEp = taosHashGet(pDnode->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
shm  
Shengliang Guan 已提交
297 298 299
  if (pDnodeEp != NULL) {
    char epstr[TSDB_EP_LEN + 1];
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
S
shm  
Shengliang Guan 已提交
300
    changed = strcmp(ep, epstr) != 0;
S
shm  
Shengliang Guan 已提交
301 302
  }

S
Shengliang Guan 已提交
303
  taosRUnLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
304 305
  return changed;
}