dmEps.c 10.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmUtil.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static void dmPrintEps(SDnodeData *pData);
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
S
shm  
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
24
  taosThreadRwlockRdlock(&pData->lock);
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->ep.port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
    }
  }

39
  taosThreadRwlockUnlock(&pData->lock);
S
Shengliang Guan 已提交
40 41
}

S
Shengliang Guan 已提交
42
int32_t dmReadEps(SDnodeData *pData) {
43
  int32_t   code = TSDB_CODE_INVALID_JSON_FORMAT;
S
shm  
Shengliang Guan 已提交
44 45
  int32_t   len = 0;
  int32_t   maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
46
  char     *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
47
  cJSON    *root = NULL;
S
Shengliang Guan 已提交
48
  char      file[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
49
  TdFilePtr pFile = NULL;
S
shm  
Shengliang Guan 已提交
50

S
Shengliang Guan 已提交
51 52
  pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
  if (pData->dnodeEps == NULL) {
S
shm  
Shengliang Guan 已提交
53
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
S
Shengliang Guan 已提交
54
    goto _OVER;
S
shm  
Shengliang Guan 已提交
55 56
  }

57
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
58
  pFile = taosOpenFile(file, TD_FILE_READ);
S
shm  
Shengliang Guan 已提交
59 60
  if (pFile == NULL) {
    code = 0;
S
Shengliang Guan 已提交
61
    goto _OVER;
S
shm  
Shengliang Guan 已提交
62 63 64 65 66
  }

  len = (int32_t)taosReadFile(pFile, content, maxLen);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
S
Shengliang Guan 已提交
67
    goto _OVER;
S
shm  
Shengliang Guan 已提交
68 69 70 71 72 73
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
S
Shengliang Guan 已提交
74
    goto _OVER;
S
shm  
Shengliang Guan 已提交
75 76 77 78 79
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_Number) {
    dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
80
    goto _OVER;
S
shm  
Shengliang Guan 已提交
81
  }
S
Shengliang Guan 已提交
82
  pData->dnodeId = dnodeId->valueint;
S
shm  
Shengliang Guan 已提交
83 84 85 86

  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
    dError("failed to read %s since clusterId not found", file);
S
Shengliang Guan 已提交
87
    goto _OVER;
S
shm  
Shengliang Guan 已提交
88
  }
S
Shengliang Guan 已提交
89
  pData->clusterId = atoll(clusterId->valuestring);
S
shm  
Shengliang Guan 已提交
90 91 92 93

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_Number) {
    dError("failed to read %s since dropped not found", file);
S
Shengliang Guan 已提交
94
    goto _OVER;
S
shm  
Shengliang Guan 已提交
95
  }
S
Shengliang Guan 已提交
96
  pData->dropped = dropped->valueint;
S
shm  
Shengliang Guan 已提交
97 98 99 100

  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", file);
S
Shengliang Guan 已提交
101
    goto _OVER;
S
shm  
Shengliang Guan 已提交
102 103 104 105 106
  }

  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
S
Shengliang Guan 已提交
107
    goto _OVER;
S
shm  
Shengliang Guan 已提交
108 109 110 111 112 113 114 115 116 117 118
  }

  for (int32_t i = 0; i < numOfDnodes; ++i) {
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;

    SDnodeEp dnodeEp = {0};

    cJSON *did = cJSON_GetObjectItem(node, "id");
    if (!did || did->type != cJSON_Number) {
      dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
119
      goto _OVER;
S
shm  
Shengliang Guan 已提交
120 121
    }

122
    dnodeEp.id = did->valueint;
S
shm  
Shengliang Guan 已提交
123 124 125 126

    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
      dError("failed to read %s since dnodeFqdn not found", file);
S
Shengliang Guan 已提交
127
      goto _OVER;
S
shm  
Shengliang Guan 已提交
128 129 130 131 132 133
    }
    tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
      dError("failed to read %s since dnodePort not found", file);
S
Shengliang Guan 已提交
134
      goto _OVER;
S
shm  
Shengliang Guan 已提交
135 136 137 138 139 140 141
    }

    dnodeEp.ep.port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
      dError("failed to read %s since isMnode not found", file);
S
Shengliang Guan 已提交
142
      goto _OVER;
S
shm  
Shengliang Guan 已提交
143 144 145
    }
    dnodeEp.isMnode = isMnode->valueint;

S
Shengliang Guan 已提交
146
    taosArrayPush(pData->dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
147 148 149
  }

  code = 0;
S
shm  
Shengliang Guan 已提交
150
  dDebug("succcessed to read file %s", file);
S
Shengliang Guan 已提交
151
  dmPrintEps(pData);
S
shm  
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153
_OVER:
wafwerar's avatar
wafwerar 已提交
154
  if (content != NULL) taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
155 156 157
  if (root != NULL) cJSON_Delete(root);
  if (pFile != NULL) taosCloseFile(&pFile);

S
Shengliang Guan 已提交
158
  if (taosArrayGetSize(pData->dnodeEps) == 0) {
S
shm  
Shengliang Guan 已提交
159 160
    SDnodeEp dnodeEp = {0};
    dnodeEp.isMnode = 1;
161
    taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
S
Shengliang Guan 已提交
162
    taosArrayPush(pData->dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
163 164
  }

S
Shengliang Guan 已提交
165
  dmResetEps(pData, pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
166

167 168
  if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
    dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
169 170 171
    return -1;
  }

S
shm  
Shengliang Guan 已提交
172 173
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
174 175
}

S
Shengliang Guan 已提交
176
int32_t dmWriteEps(SDnodeData *pData) {
S
Shengliang Guan 已提交
177
  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
178
  char realfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
179

180 181
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
  snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
182

183
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
shm  
Shengliang Guan 已提交
184 185 186 187 188 189 190 191
  if (pFile == NULL) {
    dError("failed to write %s since %s", file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
192
  char   *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
193 194

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
195 196 197
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pData->dnodeId);
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pData->dropped);
S
shm  
Shengliang Guan 已提交
198 199
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");

S
Shengliang Guan 已提交
200
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
201
  for (int32_t i = 0; i < numOfEps; ++i) {
S
Shengliang Guan 已提交
202
    SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
S
shm  
Shengliang Guan 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->ep.port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
    if (i < numOfEps - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
218
  taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
219 220

  if (taosRenameFile(file, realfile) != 0) {
S
shm  
Shengliang Guan 已提交
221
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
222 223 224 225
    dError("failed to rename %s since %s", file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
226
  pData->updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
227
  dDebug("successed to write %s", realfile);
S
shm  
Shengliang Guan 已提交
228 229 230
  return 0;
}

S
Shengliang Guan 已提交
231
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
S
Shengliang Guan 已提交
232
  int32_t numOfEps = taosArrayGetSize(eps);
S
shm  
Shengliang Guan 已提交
233 234
  if (numOfEps <= 0) return;

235
  taosThreadRwlockWrlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237
  int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
238
  if (numOfEps != numOfEpsOld) {
S
Shengliang Guan 已提交
239 240
    dmResetEps(pData, eps);
    dmWriteEps(pData);
S
shm  
Shengliang Guan 已提交
241 242
  } else {
    int32_t size = numOfEps * sizeof(SDnodeEp);
S
Shengliang Guan 已提交
243 244 245
    if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) {
      dmResetEps(pData, eps);
      dmWriteEps(pData);
S
shm  
Shengliang Guan 已提交
246 247 248
    }
  }

249
  taosThreadRwlockUnlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
250 251
}

S
Shengliang Guan 已提交
252 253 254 255
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
  if (pData->dnodeEps != dnodeEps) {
    SArray *tmp = pData->dnodeEps;
    pData->dnodeEps = taosArrayDup(dnodeEps);
S
shm  
Shengliang Guan 已提交
256 257 258
    taosArrayDestroy(tmp);
  }

S
Shengliang Guan 已提交
259 260
  pData->mnodeEps.inUse = 0;
  pData->mnodeEps.numOfEps = 0;
S
shm  
Shengliang Guan 已提交
261 262

  int32_t mIndex = 0;
S
shm  
Shengliang Guan 已提交
263
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
S
shm  
Shengliang Guan 已提交
264 265

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
266
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
shm  
Shengliang Guan 已提交
267 268
    if (!pDnodeEp->isMnode) continue;
    if (mIndex >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
269
    pData->mnodeEps.numOfEps++;
S
shm  
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271
    pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
S
shm  
Shengliang Guan 已提交
272 273 274 275
    mIndex++;
  }

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
276
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
Shengliang Guan 已提交
277
    taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
shm  
Shengliang Guan 已提交
278 279
  }

S
Shengliang Guan 已提交
280
  dmPrintEps(pData);
S
shm  
Shengliang Guan 已提交
281 282
}

S
Shengliang Guan 已提交
283 284
static void dmPrintEps(SDnodeData *pData) {
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
285 286
  dDebug("print dnode ep list, num:%d", numOfEps);
  for (int32_t i = 0; i < numOfEps; i++) {
S
Shengliang Guan 已提交
287
    SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
S
Shengliang Guan 已提交
288
    dDebug("dnode:%d, fqdn:%s port:%u is_mnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
S
shm  
Shengliang Guan 已提交
289 290 291
  }
}

S
Shengliang Guan 已提交
292
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
S
shm  
Shengliang Guan 已提交
293
  bool changed = false;
294
  if (dnodeId == 0) return changed;
295
  taosThreadRwlockRdlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
296

S
Shengliang Guan 已提交
297
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
S
shm  
Shengliang Guan 已提交
298
  if (pDnodeEp != NULL) {
299
    char epstr[TSDB_EP_LEN + 1] = {0};
S
shm  
Shengliang Guan 已提交
300
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
301 302
    changed = (strcmp(ep, epstr) != 0);
    if (changed) {
303
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
304
    }
S
shm  
Shengliang Guan 已提交
305 306
  }

307
  taosThreadRwlockUnlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
308 309
  return changed;
}
S
Shengliang Guan 已提交
310 311

void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
312
  taosThreadRwlockRdlock(&pData->lock);
S
Shengliang Guan 已提交
313
  *pEpSet = pData->mnodeEps;
314
  taosThreadRwlockUnlock(&pData->lock);
S
Shengliang Guan 已提交
315 316
}

317 318 319 320 321 322 323 324 325 326 327
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dmGetMnodeEpSet(pData, pEpSet);
  dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse);
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
    if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
      pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
    }
  }
}

S
Shengliang Guan 已提交
328
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
329
  taosThreadRwlockWrlock(&pData->lock);
S
Shengliang Guan 已提交
330
  pData->mnodeEps = *pEpSet;
331 332 333
  taosThreadRwlockUnlock(&pData->lock);

  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
334 335 336 337
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
  }
}