dnodeConfig.c 12.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dnodeConfig.h"
S
Shengliang Guan 已提交
18 19 20 21
#include "cJSON.h"
#include "thash.h"

static struct {
S
Shengliang Guan 已提交
22 23 24 25 26 27 28 29
  int32_t         dnodeId;
  int32_t         dropped;
  int64_t         clusterId;
  SDnodeEps      *dnodeEps;
  SHashObj       *dnodeHash;
  SRpcEpSet       mnodeEpSetForShell;
  SRpcEpSet       mnodeEpSetForPeer;
  char            file[PATH_MAX + 20];
S
Shengliang Guan 已提交
30
  pthread_mutex_t mutex;
S
Shengliang Guan 已提交
31
} tsConfig;
S
Shengliang Guan 已提交
32

S
Shengliang Guan 已提交
33
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
S
Shengliang Guan 已提交
34 35 36
  pthread_mutex_lock(&tsConfig.mutex);
  *epSet = tsConfig.mnodeEpSetForPeer;
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
37 38
}

S
Shengliang Guan 已提交
39 40 41 42
static void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
  pthread_mutex_lock(&tsConfig.mutex);
  *epSet = tsConfig.mnodeEpSetForShell;
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50
}

void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
  if (ep != NULL || ep->numOfEps <= 0) {
    dError("mnode is changed, but content is invalid, discard it");
    return;
  }

S
Shengliang Guan 已提交
51
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
52 53 54

  dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);

S
Shengliang Guan 已提交
55
  tsConfig.mnodeEpSetForPeer = *ep;
S
Shengliang Guan 已提交
56 57 58 59
  for (int32_t i = 0; i < ep->numOfEps; ++i) {
    ep->port[i] -= TSDB_PORT_DNODEDNODE;
    dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
  }
S
Shengliang Guan 已提交
60
  tsConfig.mnodeEpSetForShell = *ep;
S
Shengliang Guan 已提交
61

S
Shengliang Guan 已提交
62
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
}

void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
  SRpcConnInfo connInfo = {0};
  rpcGetConnInfo(rpcMsg->handle, &connInfo);

  SRpcEpSet epSet = {0};
  if (forShell) {
    dnodeGetEpSetForShell(&epSet);
  } else {
    dnodeGetEpSetForPeer(&epSet);
  }

  dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse);

  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
    if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
      if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
          (epSet.port[i] == tsServerPort && forShell)) {
        epSet.inUse = (i + 1) % epSet.numOfEps;
        dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
      }
    }

    epSet.port[i] = htons(epSet.port[i]);
  }

  rpcSendRedirectRsp(rpcMsg->handle, &epSet);
}

static void dnodePrintEps() {
S
Shengliang Guan 已提交
95 96 97
  dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum);
  for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
    SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
S
Shengliang Guan 已提交
98 99 100 101 102 103 104 105 106
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
  }
}

static void dnodeResetEps(SDnodeEps *data) {
  assert(data != NULL);

  int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);

S
Shengliang Guan 已提交
107
  if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) {
S
Shengliang Guan 已提交
108 109 110
    SDnodeEps *tmp = calloc(1, size);
    if (tmp == NULL) return;

S
Shengliang Guan 已提交
111 112
    tfree(tsConfig.dnodeEps);
    tsConfig.dnodeEps = tmp;
S
Shengliang Guan 已提交
113 114
  }

S
Shengliang Guan 已提交
115 116
  if (tsConfig.dnodeEps != data) {
    memcpy(tsConfig.dnodeEps, data, size);
S
Shengliang Guan 已提交
117 118
  }

S
Shengliang Guan 已提交
119 120
  tsConfig.mnodeEpSetForPeer.inUse = 0;
  tsConfig.mnodeEpSetForShell.inUse = 0;
S
Shengliang Guan 已提交
121
  int32_t index = 0;
S
Shengliang Guan 已提交
122 123
  for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
    SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
S
Shengliang Guan 已提交
124 125
    if (!ep->isMnode) continue;
    if (index >= TSDB_MAX_REPLICA) continue;
S
Shengliang Guan 已提交
126 127 128 129
    strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
    strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
    tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort;
    tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
S
Shengliang Guan 已提交
130 131 132
    index++;
  }

S
Shengliang Guan 已提交
133 134 135
  for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
    SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
    taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
S
Shengliang Guan 已提交
136 137 138 139 140 141 142 143
  }

  dnodePrintEps();
}

static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
  bool changed = false;

S
Shengliang Guan 已提交
144
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146
  SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
147 148 149 150 151 152 153
  if (ep != NULL) {
    char epSaved[TSDB_EP_LEN + 1];
    snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
    changed = strcmp(epstr, epSaved) != 0;
    tstrncpy(epstr, epSaved, TSDB_EP_LEN);
  }

S
Shengliang Guan 已提交
154
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
155 156 157 158 159 160 161 162 163 164 165

  return changed;
}

static int32_t dnodeReadEps() {
  int32_t len = 0;
  int32_t maxLen = 30000;
  char   *content = calloc(1, maxLen + 1);
  cJSON  *root = NULL;
  FILE   *fp = NULL;

S
Shengliang Guan 已提交
166
  fp = fopen(tsConfig.file, "r");
S
Shengliang Guan 已提交
167
  if (!fp) {
S
Shengliang Guan 已提交
168
    dDebug("file %s not exist", tsConfig.file);
S
Shengliang Guan 已提交
169 170 171 172 173
    goto PRASE_EPS_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
S
Shengliang Guan 已提交
174
    dError("failed to read %s since content is null", tsConfig.file);
S
Shengliang Guan 已提交
175 176 177 178 179 180
    goto PRASE_EPS_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
S
Shengliang Guan 已提交
181
    dError("failed to read %s since invalid json format", tsConfig.file);
S
Shengliang Guan 已提交
182 183 184 185 186
    goto PRASE_EPS_OVER;
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
  if (!dnodeId || dnodeId->type != cJSON_String) {
S
Shengliang Guan 已提交
187
    dError("failed to read %s since dnodeId not found", tsConfig.file);
S
Shengliang Guan 已提交
188 189
    goto PRASE_EPS_OVER;
  }
S
Shengliang Guan 已提交
190
  tsConfig.dnodeId = atoi(dnodeId->valuestring);
S
Shengliang Guan 已提交
191 192 193

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
  if (!dropped || dropped->type != cJSON_String) {
S
Shengliang Guan 已提交
194
    dError("failed to read %s since dropped not found", tsConfig.file);
S
Shengliang Guan 已提交
195 196
    goto PRASE_EPS_OVER;
  }
S
Shengliang Guan 已提交
197
  tsConfig.dropped = atoi(dropped->valuestring);
S
Shengliang Guan 已提交
198 199 200

  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
  if (!clusterId || clusterId->type != cJSON_String) {
S
Shengliang Guan 已提交
201
    dError("failed to read %s since clusterId not found", tsConfig.file);
S
Shengliang Guan 已提交
202 203
    goto PRASE_EPS_OVER;
  }
S
Shengliang Guan 已提交
204
  tsConfig.clusterId = atoll(clusterId->valuestring);
S
Shengliang Guan 已提交
205 206 207

  cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
  if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
S
Shengliang Guan 已提交
208
    dError("failed to read %s since dnodeInfos not found", tsConfig.file);
S
Shengliang Guan 已提交
209 210 211 212 213
    goto PRASE_EPS_OVER;
  }

  int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
  if (dnodeInfosSize <= 0) {
S
Shengliang Guan 已提交
214
    dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize);
S
Shengliang Guan 已提交
215 216 217
    goto PRASE_EPS_OVER;
  }

S
Shengliang Guan 已提交
218 219
  tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
  if (tsConfig.dnodeEps == NULL) {
S
Shengliang Guan 已提交
220 221 222
    dError("failed to calloc dnodeEpList since %s", strerror(errno));
    goto PRASE_EPS_OVER;
  }
S
Shengliang Guan 已提交
223
  tsConfig.dnodeEps->dnodeNum = dnodeInfosSize;
S
Shengliang Guan 已提交
224 225 226 227 228

  for (int32_t i = 0; i < dnodeInfosSize; ++i) {
    cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
    if (dnodeInfo == NULL) break;

S
Shengliang Guan 已提交
229
    SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
S
Shengliang Guan 已提交
230 231 232

    cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
    if (!dnodeId || dnodeId->type != cJSON_String) {
S
Shengliang Guan 已提交
233
      dError("failed to read %s, dnodeId not found", tsConfig.file);
S
Shengliang Guan 已提交
234 235 236 237 238 239
      goto PRASE_EPS_OVER;
    }
    ep->dnodeId = atoi(dnodeId->valuestring);

    cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
    if (!isMnode || isMnode->type != cJSON_String) {
S
Shengliang Guan 已提交
240
      dError("failed to read %s, isMnode not found", tsConfig.file);
S
Shengliang Guan 已提交
241 242 243 244 245 246
      goto PRASE_EPS_OVER;
    }
    ep->isMnode = atoi(isMnode->valuestring);

    cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
S
Shengliang Guan 已提交
247
      dError("failed to read %s, dnodeFqdn not found", tsConfig.file);
S
Shengliang Guan 已提交
248 249 250 251 252 253
      goto PRASE_EPS_OVER;
    }
    tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
    if (!dnodePort || dnodePort->type != cJSON_String) {
S
Shengliang Guan 已提交
254
      dError("failed to read %s, dnodePort not found", tsConfig.file);
S
Shengliang Guan 已提交
255 256 257 258 259
      goto PRASE_EPS_OVER;
    }
    ep->dnodePort = atoi(dnodePort->valuestring);
  }

S
Shengliang Guan 已提交
260
  dInfo("succcessed to read file %s", tsConfig.file);
S
Shengliang Guan 已提交
261 262 263 264 265 266 267
  dnodePrintEps();

PRASE_EPS_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

S
Shengliang Guan 已提交
268 269
  if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) {
    dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp);
S
Shengliang Guan 已提交
270 271 272
    return -1;
  }

S
Shengliang Guan 已提交
273
  dnodeResetEps(tsConfig.dnodeEps);
S
Shengliang Guan 已提交
274 275 276 277 278 279

  terrno = 0;
  return 0;
}

static int32_t dnodeWriteEps() {
S
Shengliang Guan 已提交
280
  FILE *fp = fopen(tsConfig.file, "w");
S
Shengliang Guan 已提交
281
  if (!fp) {
S
Shengliang Guan 已提交
282
    dError("failed to write %s since %s", tsConfig.file, strerror(errno));
S
Shengliang Guan 已提交
283 284 285 286 287 288 289 290
    return -1;
  }

  int32_t len = 0;
  int32_t maxLen = 30000;
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
291 292 293
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": \"%d\",\n", tsConfig.dnodeId);
  len += snprintf(content + len, maxLen - len, "  \"dropped\": \"%d\",\n", tsConfig.dropped);
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId);
S
Shengliang Guan 已提交
294
  len += snprintf(content + len, maxLen - len, "  \"dnodeInfos\": [{\n");
S
Shengliang Guan 已提交
295 296
  for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
    SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
S
Shengliang Guan 已提交
297 298 299 300
    len += snprintf(content + len, maxLen - len, "    \"dnodeId\": \"%d\",\n", ep->dnodeId);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": \"%d\",\n", ep->isMnode);
    len += snprintf(content + len, maxLen - len, "    \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
    len += snprintf(content + len, maxLen - len, "    \"dnodePort\": \"%u\"\n", ep->dnodePort);
S
Shengliang Guan 已提交
301
    if (i < tsConfig.dnodeEps->dnodeNum - 1) {
S
Shengliang Guan 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

S
Shengliang Guan 已提交
315
  dInfo("successed to write %s", tsConfig.file);
S
Shengliang Guan 已提交
316 317 318
  return 0;
}

S
Shengliang Guan 已提交
319 320 321 322 323 324 325
int32_t dnodeInitConfig() {
  tsConfig.dnodeId = 0;
  tsConfig.dropped = 0;
  tsConfig.clusterId = 0;
  tsConfig.dnodeEps = NULL;
  snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir);
  pthread_mutex_init(&tsConfig.mutex, NULL);
S
Shengliang Guan 已提交
326

S
Shengliang Guan 已提交
327 328
  tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (tsConfig.dnodeHash == NULL) return -1;
S
Shengliang Guan 已提交
329 330 331 332 333 334 335 336 337

  int32_t ret = dnodeReadEps();
  if (ret == 0) {
    dInfo("dnode eps is initialized");
  }

  return ret;
}

S
Shengliang Guan 已提交
338 339
void dnodeCleanupConfig() {
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
340

S
Shengliang Guan 已提交
341 342 343
  if (tsConfig.dnodeEps != NULL) {
    free(tsConfig.dnodeEps);
    tsConfig.dnodeEps = NULL;
S
Shengliang Guan 已提交
344 345
  }

S
Shengliang Guan 已提交
346 347 348
  if (tsConfig.dnodeHash) {
    taosHashCleanup(tsConfig.dnodeHash);
    tsConfig.dnodeHash = NULL;
S
Shengliang Guan 已提交
349 350
  }

S
Shengliang Guan 已提交
351 352
  pthread_mutex_unlock(&tsConfig.mutex);
  pthread_mutex_destroy(&tsConfig.mutex);
S
Shengliang Guan 已提交
353 354 355 356 357
}

void dnodeUpdateDnodeEps(SDnodeEps *data) {
  if (data == NULL || data->dnodeNum <= 0) return;

S
Shengliang Guan 已提交
358
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
359

S
Shengliang Guan 已提交
360
  if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) {
S
Shengliang Guan 已提交
361 362 363 364
    dnodeResetEps(data);
    dnodeWriteEps();
  } else {
    int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
S
Shengliang Guan 已提交
365
    if (memcmp(tsConfig.dnodeEps, data, size) != 0) {
S
Shengliang Guan 已提交
366 367 368 369 370
      dnodeResetEps(data);
      dnodeWriteEps();
    }
  }

S
Shengliang Guan 已提交
371
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
372 373 374
}

void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
S
Shengliang Guan 已提交
375
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
376

S
Shengliang Guan 已提交
377
  SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
378 379 380 381 382 383
  if (ep != NULL) {
    if (port) *port = ep->dnodePort;
    if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
    if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
  }

S
Shengliang Guan 已提交
384
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
385 386 387
}

void dnodeUpdateCfg(SDnodeCfg *data) {
S
Shengliang Guan 已提交
388
  if (tsConfig.dnodeId != 0 && !data->dropped) return;
S
Shengliang Guan 已提交
389

S
Shengliang Guan 已提交
390
  pthread_mutex_lock(&tsConfig.mutex);
S
Shengliang Guan 已提交
391

S
Shengliang Guan 已提交
392 393 394
  tsConfig.dnodeId = data->dnodeId;
  tsConfig.clusterId = data->clusterId;
  tsConfig.dropped = data->dropped;
S
Shengliang Guan 已提交
395 396 397
  dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);

  dnodeWriteEps();
S
Shengliang Guan 已提交
398
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
399 400 401 402
}

int32_t dnodeGetDnodeId() {
  int32_t dnodeId = 0;
S
Shengliang Guan 已提交
403 404 405
  pthread_mutex_lock(&tsConfig.mutex);
  dnodeId = tsConfig.dnodeId;
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
406 407 408 409 410
  return dnodeId;
}

int64_t dnodeGetClusterId() {
  int64_t clusterId = 0;
S
Shengliang Guan 已提交
411 412 413
  pthread_mutex_lock(&tsConfig.mutex);
  clusterId = tsConfig.clusterId;
  pthread_mutex_unlock(&tsConfig.mutex);
S
Shengliang Guan 已提交
414 415
  return clusterId;
}