dmEps.c 10.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static void dmPrintEps(SDnode *pDnode);
static bool dmIsEpChanged(SDnode *pDnode, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnode *pDnode, SArray *dnodeEps);
S
shm  
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23 24
static void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
  taosRLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  SDnodeEp *pDnodeEp = taosHashGet(pDnode->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->ep.port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
    }
  }

S
Shengliang Guan 已提交
39
  taosRUnLockLatch(&pDnode->data.latch);
S
Shengliang Guan 已提交
40 41
}

S
Shengliang Guan 已提交
42
int32_t dmReadEps(SDnode *pDnode) {
43
  int32_t   code = TSDB_CODE_INVALID_JSON_FORMAT;
S
shm  
Shengliang Guan 已提交
44 45
  int32_t   len = 0;
  int32_t   maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
46
  char     *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
47
  cJSON    *root = NULL;
S
Shengliang Guan 已提交
48
  char      file[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
49
  TdFilePtr pFile = NULL;
S
shm  
Shengliang Guan 已提交
50

S
Shengliang Guan 已提交
51 52
  pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
  if (pDnode->data.dnodeEps == NULL) {
S
shm  
Shengliang Guan 已提交
53 54 55 56
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
    goto PRASE_DNODE_OVER;
  }

S
Shengliang Guan 已提交
57
  snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
58
  pFile = taosOpenFile(file, TD_FILE_READ);
S
shm  
Shengliang Guan 已提交
59
  if (pFile == NULL) {
S
Shengliang Guan 已提交
60
    // dDebug("file %s not exist", file);
S
shm  
Shengliang Guan 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    code = 0;
    goto PRASE_DNODE_OVER;
  }

  len = (int32_t)taosReadFile(pFile, content, maxLen);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_DNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_DNODE_OVER;
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_Number) {
    dError("failed to read %s since dnodeId not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
83
  pDnode->data.dnodeId = dnodeId->valueint;
S
shm  
Shengliang Guan 已提交
84 85 86 87 88 89

  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
    dError("failed to read %s since clusterId not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
90
  pDnode->data.clusterId = atoll(clusterId->valuestring);
S
shm  
Shengliang Guan 已提交
91 92 93 94 95 96

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_Number) {
    dError("failed to read %s since dropped not found", file);
    goto PRASE_DNODE_OVER;
  }
S
Shengliang Guan 已提交
97
  pDnode->data.dropped = dropped->valueint;
S
shm  
Shengliang Guan 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", file);
    goto PRASE_DNODE_OVER;
  }

  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
    goto PRASE_DNODE_OVER;
  }

  for (int32_t i = 0; i < numOfDnodes; ++i) {
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;

    SDnodeEp dnodeEp = {0};

    cJSON *did = cJSON_GetObjectItem(node, "id");
    if (!did || did->type != cJSON_Number) {
      dError("failed to read %s since dnodeId not found", file);
      goto PRASE_DNODE_OVER;
    }

123
    dnodeEp.id = did->valueint;
S
shm  
Shengliang Guan 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146

    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
      dError("failed to read %s since dnodeFqdn not found", file);
      goto PRASE_DNODE_OVER;
    }
    tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
      dError("failed to read %s since dnodePort not found", file);
      goto PRASE_DNODE_OVER;
    }

    dnodeEp.ep.port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
      dError("failed to read %s since isMnode not found", file);
      goto PRASE_DNODE_OVER;
    }
    dnodeEp.isMnode = isMnode->valueint;

S
Shengliang Guan 已提交
147
    taosArrayPush(pDnode->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
148 149 150
  }

  code = 0;
S
shm  
Shengliang Guan 已提交
151
  dDebug("succcessed to read file %s", file);
S
Shengliang Guan 已提交
152
  dmPrintEps(pDnode);
S
shm  
Shengliang Guan 已提交
153 154

PRASE_DNODE_OVER:
wafwerar's avatar
wafwerar 已提交
155
  if (content != NULL) taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
156 157 158
  if (root != NULL) cJSON_Delete(root);
  if (pFile != NULL) taosCloseFile(&pFile);

S
Shengliang Guan 已提交
159
  if (taosArrayGetSize(pDnode->data.dnodeEps) == 0) {
S
shm  
Shengliang Guan 已提交
160 161
    SDnodeEp dnodeEp = {0};
    dnodeEp.isMnode = 1;
S
Shengliang Guan 已提交
162
    taosGetFqdnPortFromEp(pDnode->data.firstEp, &dnodeEp.ep);
S
Shengliang Guan 已提交
163
    taosArrayPush(pDnode->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
164 165
  }

S
Shengliang Guan 已提交
166
  dmResetEps(pDnode, pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
167

168 169 170 171 172
  if (dmIsEpChanged(pDnode, pDnode->data.dnodeId, pDnode->data.localEp)) {
    dError("localEp %s different with %s and need reconfigured", pDnode->data.localEp, file);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
173 174
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177
int32_t dmWriteEps(SDnode *pDnode) {
S
Shengliang Guan 已提交
178 179 180 181
  char file[PATH_MAX] = {0};
  char realfile[PATH_MAX];
  snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP);
  snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
182

183
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
shm  
Shengliang Guan 已提交
184 185 186 187 188 189 190 191
  if (pFile == NULL) {
    dError("failed to write %s since %s", file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
192
  char   *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
193 194

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
195 196 197
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pDnode->data.dnodeId);
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pDnode->data.clusterId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pDnode->data.dropped);
S
shm  
Shengliang Guan 已提交
198 199
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");

S
Shengliang Guan 已提交
200
  int32_t numOfEps = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
201
  for (int32_t i = 0; i < numOfEps; ++i) {
S
Shengliang Guan 已提交
202
    SDnodeEp *pDnodeEp = taosArrayGet(pDnode->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->ep.port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
    if (i < numOfEps - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
218
  taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
219 220

  if (taosRenameFile(file, realfile) != 0) {
S
shm  
Shengliang Guan 已提交
221
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
222 223 224 225
    dError("failed to rename %s since %s", file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
226
  pDnode->data.updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
227
  dDebug("successed to write %s", realfile);
S
shm  
Shengliang Guan 已提交
228 229 230
  return 0;
}

S
Shengliang Guan 已提交
231 232
void dmUpdateEps(SDnode *pDnode, SArray *eps) {
  int32_t numOfEps = taosArrayGetSize(eps);
S
shm  
Shengliang Guan 已提交
233 234
  if (numOfEps <= 0) return;

S
Shengliang Guan 已提交
235
  taosWLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237
  int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
238
  if (numOfEps != numOfEpsOld) {
S
Shengliang Guan 已提交
239 240
    dmResetEps(pDnode, eps);
    dmWriteEps(pDnode);
S
shm  
Shengliang Guan 已提交
241 242
  } else {
    int32_t size = numOfEps * sizeof(SDnodeEp);
S
Shengliang Guan 已提交
243 244 245
    if (memcmp(pDnode->data.dnodeEps->pData, eps->pData, size) != 0) {
      dmResetEps(pDnode, eps);
      dmWriteEps(pDnode);
S
shm  
Shengliang Guan 已提交
246 247 248
    }
  }

S
Shengliang Guan 已提交
249
  taosWUnLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
250 251
}

S
Shengliang Guan 已提交
252 253 254 255
static void dmResetEps(SDnode *pDnode, SArray *dnodeEps) {
  if (pDnode->data.dnodeEps != dnodeEps) {
    SArray *tmp = pDnode->data.dnodeEps;
    pDnode->data.dnodeEps = taosArrayDup(dnodeEps);
S
shm  
Shengliang Guan 已提交
256 257 258
    taosArrayDestroy(tmp);
  }

S
Shengliang Guan 已提交
259 260
  pDnode->data.mnodeEps.inUse = 0;
  pDnode->data.mnodeEps.numOfEps = 0;
S
shm  
Shengliang Guan 已提交
261 262

  int32_t mIndex = 0;
S
shm  
Shengliang Guan 已提交
263
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
S
shm  
Shengliang Guan 已提交
264 265

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
266
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
shm  
Shengliang Guan 已提交
267 268
    if (!pDnodeEp->isMnode) continue;
    if (mIndex >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
269
    pDnode->data.mnodeEps.numOfEps++;
S
shm  
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271
    pDnode->data.mnodeEps.eps[mIndex] = pDnodeEp->ep;
S
shm  
Shengliang Guan 已提交
272 273 274 275
    mIndex++;
  }

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
276
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
Shengliang Guan 已提交
277
    taosHashPut(pDnode->data.dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
shm  
Shengliang Guan 已提交
278 279
  }

S
Shengliang Guan 已提交
280
  dmPrintEps(pDnode);
S
shm  
Shengliang Guan 已提交
281 282
}

S
Shengliang Guan 已提交
283 284
static void dmPrintEps(SDnode *pDnode) {
  int32_t numOfEps = (int32_t)taosArrayGetSize(pDnode->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
285 286
  dDebug("print dnode ep list, num:%d", numOfEps);
  for (int32_t i = 0; i < numOfEps; i++) {
S
Shengliang Guan 已提交
287
    SDnodeEp *pEp = taosArrayGet(pDnode->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
288 289 290 291
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
  }
}

S
Shengliang Guan 已提交
292
static bool dmIsEpChanged(SDnode *pDnode, int32_t dnodeId, const char *ep) {
S
shm  
Shengliang Guan 已提交
293
  bool changed = false;
294
  if (dnodeId == 0) return changed;
S
Shengliang Guan 已提交
295
  taosRLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
296

S
Shengliang Guan 已提交
297
  SDnodeEp *pDnodeEp = taosHashGet(pDnode->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
shm  
Shengliang Guan 已提交
298
  if (pDnodeEp != NULL) {
299
    char epstr[TSDB_EP_LEN + 1] = {0};
S
shm  
Shengliang Guan 已提交
300
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
301 302
    changed = (strcmp(ep, epstr) != 0);
    if (changed) {
303
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
304
    }
S
shm  
Shengliang Guan 已提交
305 306
  }

S
Shengliang Guan 已提交
307
  taosRUnLockLatch(&pDnode->data.latch);
S
shm  
Shengliang Guan 已提交
308 309
  return changed;
}