dmEps.c 10.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmUtil.h"
H
Haojun Liao 已提交
18
#include "tmisce.h"
S
shm  
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
static void dmPrintEps(SDnodeData *pData);
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
S
shm  
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
25
  taosThreadRwlockRdlock(&pData->lock);
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
28 29 30 31 32 33 34 35 36 37 38 39
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->ep.port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
    }
  }

40
  taosThreadRwlockUnlock(&pData->lock);
S
Shengliang Guan 已提交
41 42
}

S
Shengliang Guan 已提交
43
int32_t dmReadEps(SDnodeData *pData) {
44
  int32_t   code = TSDB_CODE_INVALID_JSON_FORMAT;
S
shm  
Shengliang Guan 已提交
45 46
  int32_t   len = 0;
  int32_t   maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
47
  char     *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
48
  cJSON    *root = NULL;
S
Shengliang Guan 已提交
49
  char      file[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
50
  TdFilePtr pFile = NULL;
S
shm  
Shengliang Guan 已提交
51

S
Shengliang Guan 已提交
52 53
  pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
  if (pData->dnodeEps == NULL) {
S
shm  
Shengliang Guan 已提交
54
    dError("failed to calloc dnodeEp array since %s", strerror(errno));
S
Shengliang Guan 已提交
55
    goto _OVER;
S
shm  
Shengliang Guan 已提交
56 57
  }

58
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
59
  pFile = taosOpenFile(file, TD_FILE_READ);
S
shm  
Shengliang Guan 已提交
60 61
  if (pFile == NULL) {
    code = 0;
S
Shengliang Guan 已提交
62
    goto _OVER;
S
shm  
Shengliang Guan 已提交
63 64 65 66 67
  }

  len = (int32_t)taosReadFile(pFile, content, maxLen);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
S
Shengliang Guan 已提交
68
    goto _OVER;
S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
S
Shengliang Guan 已提交
75
    goto _OVER;
S
shm  
Shengliang Guan 已提交
76 77 78 79 80
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_Number) {
    dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
81
    goto _OVER;
S
shm  
Shengliang Guan 已提交
82
  }
S
Shengliang Guan 已提交
83
  pData->dnodeId = dnodeId->valueint;
S
shm  
Shengliang Guan 已提交
84

85 86 87 88 89 90 91
  cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer");
  if (!dnodeVer || dnodeVer->type != cJSON_String) {
    dError("failed to read %s since dnodeVer not found", file);
    goto _OVER;
  }
  pData->dnodeVer = atoll(dnodeVer->valuestring);

S
shm  
Shengliang Guan 已提交
92 93 94
  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
    dError("failed to read %s since clusterId not found", file);
S
Shengliang Guan 已提交
95
    goto _OVER;
S
shm  
Shengliang Guan 已提交
96
  }
S
Shengliang Guan 已提交
97
  pData->clusterId = atoll(clusterId->valuestring);
S
shm  
Shengliang Guan 已提交
98 99 100 101

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_Number) {
    dError("failed to read %s since dropped not found", file);
S
Shengliang Guan 已提交
102
    goto _OVER;
S
shm  
Shengliang Guan 已提交
103
  }
S
Shengliang Guan 已提交
104
  pData->dropped = dropped->valueint;
S
shm  
Shengliang Guan 已提交
105 106 107 108

  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", file);
S
Shengliang Guan 已提交
109
    goto _OVER;
S
shm  
Shengliang Guan 已提交
110 111 112 113 114
  }

  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
S
Shengliang Guan 已提交
115
    goto _OVER;
S
shm  
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124 125 126
  }

  for (int32_t i = 0; i < numOfDnodes; ++i) {
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;

    SDnodeEp dnodeEp = {0};

    cJSON *did = cJSON_GetObjectItem(node, "id");
    if (!did || did->type != cJSON_Number) {
      dError("failed to read %s since dnodeId not found", file);
S
Shengliang Guan 已提交
127
      goto _OVER;
S
shm  
Shengliang Guan 已提交
128 129
    }

130
    dnodeEp.id = did->valueint;
S
shm  
Shengliang Guan 已提交
131 132 133 134

    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
      dError("failed to read %s since dnodeFqdn not found", file);
S
Shengliang Guan 已提交
135
      goto _OVER;
S
shm  
Shengliang Guan 已提交
136 137 138 139 140 141
    }
    tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
      dError("failed to read %s since dnodePort not found", file);
S
Shengliang Guan 已提交
142
      goto _OVER;
S
shm  
Shengliang Guan 已提交
143 144 145 146 147 148 149
    }

    dnodeEp.ep.port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
      dError("failed to read %s since isMnode not found", file);
S
Shengliang Guan 已提交
150
      goto _OVER;
S
shm  
Shengliang Guan 已提交
151 152 153
    }
    dnodeEp.isMnode = isMnode->valueint;

S
Shengliang Guan 已提交
154
    taosArrayPush(pData->dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
155 156 157
  }

  code = 0;
S
shm  
Shengliang Guan 已提交
158
  dDebug("succcessed to read file %s", file);
S
shm  
Shengliang Guan 已提交
159

S
Shengliang Guan 已提交
160
_OVER:
wafwerar's avatar
wafwerar 已提交
161
  if (content != NULL) taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
162 163 164
  if (root != NULL) cJSON_Delete(root);
  if (pFile != NULL) taosCloseFile(&pFile);

S
Shengliang Guan 已提交
165
  if (taosArrayGetSize(pData->dnodeEps) == 0) {
S
shm  
Shengliang Guan 已提交
166 167
    SDnodeEp dnodeEp = {0};
    dnodeEp.isMnode = 1;
168
    taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
S
Shengliang Guan 已提交
169
    taosArrayPush(pData->dnodeEps, &dnodeEp);
S
shm  
Shengliang Guan 已提交
170 171
  }

S
Shengliang Guan 已提交
172
  dDebug("reset dnode list on startup");
S
Shengliang Guan 已提交
173
  dmResetEps(pData, pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
174

175 176
  if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
    dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
177 178 179
    return -1;
  }

S
shm  
Shengliang Guan 已提交
180 181
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
182 183
}

S
Shengliang Guan 已提交
184
int32_t dmWriteEps(SDnodeData *pData) {
S
Shengliang Guan 已提交
185
  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
186
  char realfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
187

188 189
  snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
  snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
S
shm  
Shengliang Guan 已提交
190

191
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
S
shm  
Shengliang Guan 已提交
192 193 194 195 196 197 198 199
  if (pFile == NULL) {
    dError("failed to write %s since %s", file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 256 * 1024;
wafwerar's avatar
wafwerar 已提交
200
  char   *content = taosMemoryCalloc(1, maxLen + 1);
S
shm  
Shengliang Guan 已提交
201 202

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
203
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pData->dnodeId);
204
  len += snprintf(content + len, maxLen - len, "  \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer);
S
Shengliang Guan 已提交
205 206
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pData->dropped);
S
shm  
Shengliang Guan 已提交
207 208
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");

S
Shengliang Guan 已提交
209
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
S
shm  
Shengliang Guan 已提交
210
  for (int32_t i = 0; i < numOfEps; ++i) {
S
Shengliang Guan 已提交
211
    SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
S
shm  
Shengliang Guan 已提交
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->ep.port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
    if (i < numOfEps - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
227
  taosMemoryFree(content);
S
shm  
Shengliang Guan 已提交
228 229

  if (taosRenameFile(file, realfile) != 0) {
S
shm  
Shengliang Guan 已提交
230
    terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
231 232 233 234
    dError("failed to rename %s since %s", file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
235
  pData->updateTime = taosGetTimestampMs();
236
  dDebug("successed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
S
shm  
Shengliang Guan 已提交
237 238 239
  return 0;
}

S
Shengliang Guan 已提交
240
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
241
  taosThreadRwlockWrlock(&pData->lock);
242 243 244
  dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
  dmResetEps(pData, eps);
  dmWriteEps(pData);
245
  taosThreadRwlockUnlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
246 247
}

S
Shengliang Guan 已提交
248 249 250 251
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
  if (pData->dnodeEps != dnodeEps) {
    SArray *tmp = pData->dnodeEps;
    pData->dnodeEps = taosArrayDup(dnodeEps);
S
shm  
Shengliang Guan 已提交
252 253 254
    taosArrayDestroy(tmp);
  }

S
Shengliang Guan 已提交
255 256
  pData->mnodeEps.inUse = 0;
  pData->mnodeEps.numOfEps = 0;
S
shm  
Shengliang Guan 已提交
257 258

  int32_t mIndex = 0;
S
shm  
Shengliang Guan 已提交
259
  int32_t numOfEps = (int32_t)taosArrayGetSize(dnodeEps);
S
shm  
Shengliang Guan 已提交
260 261

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
262
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
shm  
Shengliang Guan 已提交
263 264
    if (!pDnodeEp->isMnode) continue;
    if (mIndex >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
265
    pData->mnodeEps.numOfEps++;
S
shm  
Shengliang Guan 已提交
266

S
Shengliang Guan 已提交
267
    pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
S
shm  
Shengliang Guan 已提交
268 269 270 271
    mIndex++;
  }

  for (int32_t i = 0; i < numOfEps; i++) {
S
shm  
Shengliang Guan 已提交
272
    SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
S
Shengliang Guan 已提交
273
    taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
shm  
Shengliang Guan 已提交
274 275
  }

S
Shengliang Guan 已提交
276
  dmPrintEps(pData);
S
shm  
Shengliang Guan 已提交
277 278
}

S
Shengliang Guan 已提交
279 280
static void dmPrintEps(SDnodeData *pData) {
  int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
S
Shengliang Guan 已提交
281
  dDebug("print dnode list, num:%d", numOfEps);
S
shm  
Shengliang Guan 已提交
282
  for (int32_t i = 0; i < numOfEps; i++) {
S
Shengliang Guan 已提交
283
    SDnodeEp *pEp = taosArrayGet(pData->dnodeEps, i);
284
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode);
S
shm  
Shengliang Guan 已提交
285 286 287
  }
}

S
Shengliang Guan 已提交
288
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
S
shm  
Shengliang Guan 已提交
289
  bool changed = false;
290
  if (dnodeId == 0) return changed;
291
  taosThreadRwlockRdlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
292

S
Shengliang Guan 已提交
293
  SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
S
shm  
Shengliang Guan 已提交
294
  if (pDnodeEp != NULL) {
295
    char epstr[TSDB_EP_LEN + 1] = {0};
S
shm  
Shengliang Guan 已提交
296
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
297 298
    changed = (strcmp(ep, epstr) != 0);
    if (changed) {
299
      dError("dnode:%d, localEp %s different from %s", dnodeId, ep, epstr);
300
    }
S
shm  
Shengliang Guan 已提交
301 302
  }

303
  taosThreadRwlockUnlock(&pData->lock);
S
shm  
Shengliang Guan 已提交
304 305
  return changed;
}
S
Shengliang Guan 已提交
306 307

void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
308
  taosThreadRwlockRdlock(&pData->lock);
S
Shengliang Guan 已提交
309
  *pEpSet = pData->mnodeEps;
310
  taosThreadRwlockUnlock(&pData->lock);
S
Shengliang Guan 已提交
311 312
}

313 314
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dmGetMnodeEpSet(pData, pEpSet);
S
Shengliang Guan 已提交
315
  dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
316
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
S
Shengliang Guan 已提交
317
    dTrace("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
318 319 320 321 322 323
    if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
      pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
    }
  }
}

S
Shengliang Guan 已提交
324
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
325
  if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
326
  taosThreadRwlockWrlock(&pData->lock);
S
Shengliang Guan 已提交
327
  pData->mnodeEps = *pEpSet;
328 329 330
  taosThreadRwlockUnlock(&pData->lock);

  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
331 332 333 334
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
  }
}