dmEps.c 9.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static void dmPrintEps(SDnodeMgmt *pMgmt);
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps);
S
shm  
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23 24
static void dmGetDnodeEp(SDnodeMgmt *pMgmt, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
  taosRLockLatch(&pMgmt->data.latch);
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->ep.port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
    }
  }

S
Shengliang Guan 已提交
39
  taosRUnLockLatch(&pMgmt->data.latch);
S
Shengliang Guan 已提交
40 41
}

S
Shengliang Guan 已提交
42
int32_t dmReadEps(SDnodeMgmt *pMgmt) {
43
  int32_t   code = TSDB_CODE_INVALID_JSON_FORMAT;
S
shm  
Shengliang Guan 已提交
44 45
  int32_t   len = 0;
  int32_t   maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
46
  char     *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
47
  cJSON    *root = NULL;
S
Shengliang Guan 已提交
48
  char      file[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
49
  TdFilePtr pFile = NULL;
S
shm  
Shengliang Guan 已提交
50

S
Shengliang Guan 已提交
51 52
  pMgmt->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
  if (pMgmt->data.dnodeEps == NULL) {
S
shm  
Shengliang Guan 已提交
53
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
S
Shengliang Guan 已提交
54
    goto _OVER;
S
shm  
Shengliang Guan 已提交
55 56
  }

S
Shengliang Guan 已提交
57
  snprintf(file, sizeof(file), "%s%sdnode.json", pMgmt->path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
58
  pFile = taosOpenFile(file, TD_FILE_READ);
S
shm  
Shengliang Guan 已提交
59 60
  if (pFile == NULL) {
    code = 0;
S
Shengliang Guan 已提交
61
    goto _OVER;
S
shm  
Shengliang Guan 已提交
62 63 64 65 66
  }

  len = (int32_t)taosReadFile(pFile, content, maxLen);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
S
Shengliang Guan 已提交
67
    goto _OVER;
S
shm  
Shengliang Guan 已提交
68 69 70 71 72 73
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
S
Shengliang Guan 已提交
74
    goto _OVER;
S
shm  
Shengliang Guan 已提交
75 76 77 78 79
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_Number) {
    dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
80
    goto _OVER;
S
shm  
Shengliang Guan 已提交
81
  }
S
Shengliang Guan 已提交
82
  pMgmt->data.dnodeId = dnodeId->valueint;
S
shm  
Shengliang Guan 已提交
83 84 85 86

  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
    dError("failed to read %s since clusterId not found", file);
S
Shengliang Guan 已提交
87
    goto _OVER;
S
shm  
Shengliang Guan 已提交
88
  }
S
Shengliang Guan 已提交
89
  pMgmt->data.clusterId = atoll(clusterId->valuestring);
S
shm  
Shengliang Guan 已提交
90 91 92 93

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_Number) {
    dError("failed to read %s since dropped not found", file);
S
Shengliang Guan 已提交
94
    goto _OVER;
S
shm  
Shengliang Guan 已提交
95
  }
S
Shengliang Guan 已提交
96
  pMgmt->data.dropped = dropped->valueint;
S
shm  
Shengliang Guan 已提交
97 98 99 100

  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", file);
S
Shengliang Guan 已提交
101
    goto _OVER;
S
shm  
Shengliang Guan 已提交
102 103 104 105 106
  }

  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
S
Shengliang Guan 已提交
107
    goto _OVER;
S
shm  
Shengliang Guan 已提交
108 109 110 111 112 113 114 115 116 117 118
  }

  for (int32_t i = 0; i < numOfDnodes; ++i) {
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;

    SDnodeEp dnodeEp = {0};

    cJSON *did = cJSON_GetObjectItem(node, "id");
    if (!did || did->type != cJSON_Number) {
      dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
119
      goto _OVER;
S
shm  
Shengliang Guan 已提交
120 121
    }

122
    dnodeEp.id = did->valueint;
S
shm  
Shengliang Guan 已提交
123 124 125 126

    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
      dError("failed to read %s since dnodeFqdn not found", file);
S
Shengliang Guan 已提交
127
      goto _OVER;
S
shm  
Shengliang Guan 已提交
128 129 130 131 132 133
    }
    tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
      dError("failed to read %s since dnodePort not found", file);
S
Shengliang Guan 已提交
134
      goto _OVER;
S
shm  
Shengliang Guan 已提交
135 136 137 138 139 140 141
    }

    dnodeEp.ep.port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
      dError("failed to read %s since isMnode not found", file);
S
Shengliang Guan 已提交
142
      goto _OVER;
S
shm  
Shengliang Guan 已提交
143 144 145
    }
    dnodeEp.isMnode = isMnode->valueint;

S
Shengliang Guan 已提交
146
    taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
147 148 149
  }

  code = 0;
S
shm  
Shengliang Guan 已提交
150
  dDebug("succcessed to read file %s", file);
S
Shengliang Guan 已提交
151
  dmPrintEps(pMgmt);
S
shm  
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153
_OVER:
wafwerar's avatar
wafwerar 已提交
154
  if (content != NULL) taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
155 156 157
  if (root != NULL) cJSON_Delete(root);
  if (pFile != NULL) taosCloseFile(&pFile);

S
Shengliang Guan 已提交
158
  if (taosArrayGetSize(pMgmt->data.dnodeEps) == 0) {
S
shm  
Shengliang Guan 已提交
159 160
    SDnodeEp dnodeEp = {0};
    dnodeEp.isMnode = 1;
S
Shengliang Guan 已提交
161 162
    taosGetFqdnPortFromEp(pMgmt->data.firstEp, &dnodeEp.ep);
    taosArrayPush(pMgmt->data.dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
163 164
  }

S
Shengliang Guan 已提交
165
  dmResetEps(pMgmt, pMgmt->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
166

S
Shengliang Guan 已提交
167 168
  if (dmIsEpChanged(pMgmt, pMgmt->data.dnodeId, pMgmt->data.localEp)) {
    dError("localEp %s different with %s and need reconfigured", pMgmt->data.localEp, file);
169 170 171
    return -1;
  }

S
shm  
Shengliang Guan 已提交
172 173
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
174 175
}

S
Shengliang Guan 已提交
176
int32_t dmWriteEps(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
177
  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
178
  char realfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
179 180
  snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP);
  snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pMgmt->path, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
181

182
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
shm  
Shengliang Guan 已提交
183 184 185 186 187 188 189 190
  if (pFile == NULL) {
    dError("failed to write %s since %s", file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
191
  char   *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
192 193

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
194 195 196
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pMgmt->data.dnodeId);
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pMgmt->data.clusterId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->data.dropped);
S
shm  
Shengliang Guan 已提交
197 198
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");

S
Shengliang Guan 已提交
199
  int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
200
  for (int32_t i = 0; i < numOfEps; ++i) {
S
Shengliang Guan 已提交
201
    SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->ep.port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
    if (i < numOfEps - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
217
  taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
218 219

  if (taosRenameFile(file, realfile) != 0) {
S
shm  
Shengliang Guan 已提交
220
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
221 222 223 224
    dError("failed to rename %s since %s", file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
225
  pMgmt->data.updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
226
  dDebug("successed to write %s", realfile);
S
shm  
Shengliang Guan 已提交
227 228 229
  return 0;
}

S
Shengliang Guan 已提交
230
void dmUpdateEps(SDnodeMgmt *pMgmt, SArray *eps) {
S
Shengliang Guan 已提交
231
  int32_t numOfEps = taosArrayGetSize(eps);
S
shm  
Shengliang Guan 已提交
232 233
  if (numOfEps <= 0) return;

S
Shengliang Guan 已提交
234
  taosWLockLatch(&pMgmt->data.latch);
S
shm  
Shengliang Guan 已提交
235

S
Shengliang Guan 已提交
236
  int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
237
  if (numOfEps != numOfEpsOld) {
S
Shengliang Guan 已提交
238 239
    dmResetEps(pMgmt, eps);
    dmWriteEps(pMgmt);
S
shm  
Shengliang Guan 已提交
240 241
  } else {
    int32_t size = numOfEps * sizeof(SDnodeEp);
S
Shengliang Guan 已提交
242 243 244
    if (memcmp(pMgmt->data.dnodeEps->pData, eps->pData, size) != 0) {
      dmResetEps(pMgmt, eps);
      dmWriteEps(pMgmt);
S
shm  
Shengliang Guan 已提交
245 246 247
    }
  }

S
Shengliang Guan 已提交
248
  taosWUnLockLatch(&pMgmt->data.latch);
S
shm  
Shengliang Guan 已提交
249 250
}

S
Shengliang Guan 已提交
251 252 253 254
static void dmResetEps(SDnodeMgmt *pMgmt, SArray *dnodeEps) {
  if (pMgmt->data.dnodeEps != dnodeEps) {
    SArray *tmp = pMgmt->data.dnodeEps;
    pMgmt->data.dnodeEps = taosArrayDup(dnodeEps);
S
shm  
Shengliang Guan 已提交
255 256 257
    taosArrayDestroy(tmp);
  }

S
Shengliang Guan 已提交
258 259
  pMgmt->data.mnodeEps.inUse = 0;
  pMgmt->data.mnodeEps.numOfEps = 0;
S
shm  
Shengliang Guan 已提交
260 261

  int32_t mIndex = 0;
S
shm  
Shengliang Guan 已提交
262
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
S
shm  
Shengliang Guan 已提交
263 264

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
265
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
shm  
Shengliang Guan 已提交
266 267
    if (!pDnodeEp->isMnode) continue;
    if (mIndex >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
268
    pMgmt->data.mnodeEps.numOfEps++;
S
shm  
Shengliang Guan 已提交
269

S
Shengliang Guan 已提交
270
    pMgmt->data.mnodeEps.eps[mIndex] = pDnodeEp->ep;
S
shm  
Shengliang Guan 已提交
271 272 273 274
    mIndex++;
  }

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
275
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
Shengliang Guan 已提交
276
    taosHashPut(pMgmt->data.dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
shm  
Shengliang Guan 已提交
277 278
  }

S
Shengliang Guan 已提交
279
  dmPrintEps(pMgmt);
S
shm  
Shengliang Guan 已提交
280 281
}

S
Shengliang Guan 已提交
282 283
static void dmPrintEps(SDnodeMgmt *pMgmt) {
  int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->data.dnodeEps);
S
shm  
Shengliang Guan 已提交
284 285
  dDebug("print dnode ep list, num:%d", numOfEps);
  for (int32_t i = 0; i < numOfEps; i++) {
S
Shengliang Guan 已提交
286
    SDnodeEp *pEp = taosArrayGet(pMgmt->data.dnodeEps, i);
S
shm  
Shengliang Guan 已提交
287 288 289 290
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
  }
}

S
Shengliang Guan 已提交
291
static bool dmIsEpChanged(SDnodeMgmt *pMgmt, int32_t dnodeId, const char *ep) {
S
shm  
Shengliang Guan 已提交
292
  bool changed = false;
293
  if (dnodeId == 0) return changed;
S
Shengliang Guan 已提交
294
  taosRLockLatch(&pMgmt->data.latch);
S
shm  
Shengliang Guan 已提交
295

S
Shengliang Guan 已提交
296
  SDnodeEp *pDnodeEp = taosHashGet(pMgmt->data.dnodeHash, &dnodeId, sizeof(int32_t));
S
shm  
Shengliang Guan 已提交
297
  if (pDnodeEp != NULL) {
298
    char epstr[TSDB_EP_LEN + 1] = {0};
S
shm  
Shengliang Guan 已提交
299
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
300 301
    changed = (strcmp(ep, epstr) != 0);
    if (changed) {
302
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
303
    }
S
shm  
Shengliang Guan 已提交
304 305
  }

S
Shengliang Guan 已提交
306
  taosRUnLockLatch(&pMgmt->data.latch);
S
shm  
Shengliang Guan 已提交
307 308
  return changed;
}