dndMnode.c 18.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndMnode.h"
S
Shengliang Guan 已提交
18
#include "dndMgmt.h"
S
Shengliang Guan 已提交
19
#include "dndTransport.h"
S
Shengliang Guan 已提交
20
#include "dndWorker.h"
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg);
S
Shengliang Guan 已提交
23
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
24 25

static SMnode *dndAcquireMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
26
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
  SMnode     *pMnode = NULL;
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMgmt->deployed && !pMgmt->dropped) {
    refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
    pMnode = pMgmt->pMnode;
  } else {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
  }
  taosRUnLockLatch(&pMgmt->latch);

39 40 41
  if (pMnode != NULL) {
    dTrace("acquire mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
42 43 44 45
  return pMnode;
}

static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
S
Shengliang Guan 已提交
46
  if (pMnode == NULL) return;
S
Shengliang Guan 已提交
47

S
Shengliang Guan 已提交
48
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
49
  taosRLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
50
  int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
S
Shengliang Guan 已提交
51
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
52
  dTrace("release mnode, refCount:%d", refCount);
S
Shengliang Guan 已提交
53 54 55
}

static int32_t dndReadMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
56
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
57 58
  int32_t     code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
  int32_t     len = 0;
59
  int32_t     maxLen = 4096;
S
Shengliang Guan 已提交
60 61 62
  char       *content = calloc(1, maxLen + 1);
  cJSON      *root = NULL;

S
Shengliang Guan 已提交
63 64 65 66
  char file[PATH_MAX + 20];
  snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);

  FILE *fp = fopen(file, "r");
S
Shengliang Guan 已提交
67
  if (fp == NULL) {
S
Shengliang Guan 已提交
68
    dDebug("file %s not exist", file);
S
Shengliang Guan 已提交
69 70 71 72 73 74
    code = 0;
    goto PRASE_MNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
S
Shengliang Guan 已提交
75
    dError("failed to read %s since content is null", file);
S
Shengliang Guan 已提交
76 77 78 79 80 81
    goto PRASE_MNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
S
Shengliang Guan 已提交
82
    dError("failed to read %s since invalid json format", file);
S
Shengliang Guan 已提交
83 84 85 86
    goto PRASE_MNODE_OVER;
  }

  cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
S
Shengliang Guan 已提交
87
  if (!deployed || deployed->type != cJSON_Number) {
S
Shengliang Guan 已提交
88
    dError("failed to read %s since deployed not found", file);
S
Shengliang Guan 已提交
89 90
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
91
  pMgmt->deployed = deployed->valueint;
S
Shengliang Guan 已提交
92 93

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
S
Shengliang Guan 已提交
94
  if (!dropped || dropped->type != cJSON_Number) {
S
Shengliang Guan 已提交
95
    dError("failed to read %s since dropped not found", file);
S
Shengliang Guan 已提交
96 97
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
98
  pMgmt->dropped = dropped->valueint;
S
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100 101
  cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
  if (!mnodes || mnodes->type != cJSON_Array) {
S
Shengliang Guan 已提交
102
    dError("failed to read %s since nodes not found", file);
103 104 105
    goto PRASE_MNODE_OVER;
  }

S
Shengliang Guan 已提交
106
  pMgmt->replica = cJSON_GetArraySize(mnodes);
107
  if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
S
Shengliang Guan 已提交
108
    dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
109 110 111 112
    goto PRASE_MNODE_OVER;
  }

  for (int32_t i = 0; i < pMgmt->replica; ++i) {
S
Shengliang Guan 已提交
113
    cJSON *node = cJSON_GetArrayItem(mnodes, i);
114 115 116 117 118
    if (node == NULL) break;

    SReplica *pReplica = &pMgmt->replicas[i];

    cJSON *id = cJSON_GetObjectItem(node, "id");
S
Shengliang Guan 已提交
119
    if (!id || id->type != cJSON_Number) {
S
Shengliang Guan 已提交
120
      dError("failed to read %s since id not found", file);
121 122
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
123
    pReplica->id = id->valueint;
124 125 126

    cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
S
Shengliang Guan 已提交
127
      dError("failed to read %s since fqdn not found", file);
128 129 130 131 132
      goto PRASE_MNODE_OVER;
    }
    tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *port = cJSON_GetObjectItem(node, "port");
S
Shengliang Guan 已提交
133
    if (!port || port->type != cJSON_Number) {
S
Shengliang Guan 已提交
134
      dError("failed to read %s since port not found", file);
135 136
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
137
    pReplica->port = port->valueint;
138 139
  }

S
Shengliang Guan 已提交
140
  code = 0;
S
Shengliang Guan 已提交
141
  dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
142 143 144 145 146 147 148 149 150 151 152

PRASE_MNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  terrno = code;
  return code;
}

static int32_t dndWriteMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
153
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
154

S
Shengliang Guan 已提交
155 156
  char file[PATH_MAX + 20];
  snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode);
S
Shengliang Guan 已提交
157 158

  FILE *fp = fopen(file, "w");
S
Shengliang Guan 已提交
159
  if (fp == NULL) {
S
Shengliang Guan 已提交
160 161 162 163 164 165
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }

  int32_t len = 0;
166
  int32_t maxLen = 4096;
S
Shengliang Guan 已提交
167 168 169
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
170
  len += snprintf(content + len, maxLen - len, "  \"deployed\": %d,\n", pMgmt->deployed);
171

S
Shengliang Guan 已提交
172 173
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->dropped);
  len += snprintf(content + len, maxLen - len, "  \"mnodes\": [{\n");
174 175
  for (int32_t i = 0; i < pMgmt->replica; ++i) {
    SReplica *pReplica = &pMgmt->replicas[i];
S
Shengliang Guan 已提交
176
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pReplica->id);
177
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pReplica->fqdn);
S
Shengliang Guan 已提交
178
    len += snprintf(content + len, maxLen - len, "    \"port\": %u\n", pReplica->port);
179 180 181 182 183 184
    if (i < pMgmt->replica - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
S
Shengliang Guan 已提交
185 186 187 188 189 190 191
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);

S
Shengliang Guan 已提交
192 193 194 195
  char realfile[PATH_MAX + 20];
  snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);

  if (taosRenameFile(file, realfile) != 0) {
S
Shengliang Guan 已提交
196
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
S
Shengliang Guan 已提交
197
    dError("failed to rename %s since %s", file, terrstr());
S
Shengliang Guan 已提交
198 199 200
    return -1;
  }

S
Shengliang Guan 已提交
201
  dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
202 203 204 205
  return 0;
}

static int32_t dndStartMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
206 207
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
208 209 210 211
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
212
  if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
213 214 215 216
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
217
  if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
218 219 220 221
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
222 223 224 225
  return 0;
}

static void dndStopMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
226
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
227 228 229 230 231

  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
232 233 234
  while (pMgmt->refCount > 1) {
    taosMsleep(10);
  }
235

S
Shengliang Guan 已提交
236 237 238
  dndCleanupWorker(&pMgmt->readWorker);
  dndCleanupWorker(&pMgmt->writeWorker);
  dndCleanupWorker(&pMgmt->syncWorker);
S
Shengliang Guan 已提交
239 240 241 242 243 244 245 246 247 248
}

static bool dndNeedDeployMnode(SDnode *pDnode) {
  if (dndGetDnodeId(pDnode) > 0) {
    return false;
  }

  if (dndGetClusterId(pDnode) > 0) {
    return false;
  }
S
Shengliang Guan 已提交
249

S
Shengliang Guan 已提交
250 251 252 253 254 255 256
  if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
    return false;
  }

  return true;
}

S
Shengliang Guan 已提交
257 258 259 260
static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg);
}

S
Shengliang Guan 已提交
261 262
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
  pOption->pDnode = pDnode;
S
Shengliang Guan 已提交
263 264 265 266
  pOption->sendReqToDnodeFp = dndSendReqToDnode;
  pOption->sendReqToMnodeFp = dndSendReqToMnode;
  pOption->sendRedirectRspFp = dndSendRedirectRsp;
  pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
S
Shengliang Guan 已提交
267 268
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);
S
Shengliang Guan 已提交
269 270 271 272 273 274 275 276 277
  pOption->cfg.sver = pDnode->opt.sver;
  pOption->cfg.enableTelem = pDnode->opt.enableTelem;
  pOption->cfg.statusInterval = pDnode->opt.statusInterval;
  pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
  pOption->cfg.timezone = pDnode->opt.timezone;
  pOption->cfg.charset = pDnode->opt.charset;
  pOption->cfg.locale = pDnode->opt.locale;
  pOption->cfg.gitinfo = pDnode->opt.gitinfo;
  pOption->cfg.buildinfo = pDnode->opt.buildinfo;
S
Shengliang Guan 已提交
278 279 280 281 282 283 284 285 286
}

static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
  pReplica->port = pDnode->opt.serverPort;
S
Shengliang Guan 已提交
287
  memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
288 289 290 291 292

  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
293 294 295 296
}

static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
297 298 299 300
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
301 302
}

S
Shengliang Guan 已提交
303
static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
S
Shengliang Guan 已提交
304 305 306 307
  dndInitMnodeOption(pDnode, pOption);
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);

S
Shengliang Guan 已提交
308
  pOption->replica = pCreate->replica;
S
Shengliang Guan 已提交
309
  pOption->selfIndex = -1;
S
Shengliang Guan 已提交
310
  for (int32_t i = 0; i < pCreate->replica; ++i) {
311
    SReplica *pReplica = &pOption->replicas[i];
S
Shengliang Guan 已提交
312 313 314
    pReplica->id = pCreate->replicas[i].id;
    pReplica->port = pCreate->replicas[i].port;
    memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
315
    if (pReplica->id == pOption->dnodeId) {
316
      pOption->selfIndex = i;
S
Shengliang Guan 已提交
317 318 319
    }
  }

S
Shengliang Guan 已提交
320
  if (pOption->selfIndex == -1) {
S
Shengliang Guan 已提交
321 322 323 324
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

325 326 327 328
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
329 330 331
  return 0;
}

S
Shengliang Guan 已提交
332
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
333
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
334

335
  SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
S
Shengliang Guan 已提交
336 337
  if (pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
S
Shengliang Guan 已提交
338 339 340
    return -1;
  }

S
Shengliang Guan 已提交
341 342
  if (dndStartMnodeWorker(pDnode) != 0) {
    dError("failed to start mnode worker since %s", terrstr());
S
Shengliang Guan 已提交
343 344 345
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
    return -1;
S
Shengliang Guan 已提交
346 347
  }

S
Shengliang Guan 已提交
348 349 350
  pMgmt->deployed = 1;
  if (dndWriteMnodeFile(pDnode) != 0) {
    dError("failed to write mnode file since %s", terrstr());
S
Shengliang Guan 已提交
351
    pMgmt->deployed = 0;
S
Shengliang Guan 已提交
352
    dndStopMnodeWorker(pDnode);
353 354
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
355
    return -1;
S
Shengliang Guan 已提交
356 357 358 359 360 361 362
  }

  taosWLockLatch(&pMgmt->latch);
  pMgmt->pMnode = pMnode;
  pMgmt->deployed = 1;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
363
  dInfo("mnode open successfully");
S
Shengliang Guan 已提交
364 365 366
  return 0;
}

S
Shengliang Guan 已提交
367
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
368
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
369 370 371 372 373 374 375

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to alter mnode since %s", terrstr());
    return -1;
  }

376
  if (mndAlter(pMnode, pOption) != 0) {
S
Shengliang Guan 已提交
377 378 379 380 381 382 383 384 385 386
    dError("failed to alter mnode since %s", terrstr());
    dndReleaseMnode(pDnode, pMnode);
    return -1;
  }

  dndReleaseMnode(pDnode, pMnode);
  return 0;
}

static int32_t dndDropMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
387
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

  taosRLockLatch(&pMgmt->latch);
  pMgmt->dropped = 1;
  taosRUnLockLatch(&pMgmt->latch);

  if (dndWriteMnodeFile(pDnode) != 0) {
    taosRLockLatch(&pMgmt->latch);
    pMgmt->dropped = 0;
    taosRUnLockLatch(&pMgmt->latch);

    dndReleaseMnode(pDnode, pMnode);
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
409
  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
410
  dndStopMnodeWorker(pDnode);
S
Shengliang Guan 已提交
411
  pMgmt->deployed = 0;
S
Shengliang Guan 已提交
412
  dndWriteMnodeFile(pDnode);
413
  mndClose(pMnode);
S
Shengliang Guan 已提交
414
  pMgmt->pMnode = NULL;
415
  mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
416 417 418 419

  return 0;
}

S
Shengliang Guan 已提交
420 421
static SDCreateMnodeReq *dndParseCreateMnodeReq(SRpcMsg *pReq) {
  SDCreateMnodeReq *pCreate = pReq->pCont;
S
Shengliang Guan 已提交
422 423 424 425
  pCreate->dnodeId = htonl(pCreate->dnodeId);
  for (int32_t i = 0; i < pCreate->replica; ++i) {
    pCreate->replicas[i].id = htonl(pCreate->replicas[i].id);
    pCreate->replicas[i].port = htons(pCreate->replicas[i].port);
S
Shengliang Guan 已提交
426 427
  }

S
Shengliang Guan 已提交
428
  return pCreate;
S
Shengliang Guan 已提交
429 430
}

S
Shengliang Guan 已提交
431
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
432
  SDCreateMnodeReq *pCreate = dndParseCreateMnodeReq(pReq);
S
Shengliang Guan 已提交
433

S
Shengliang Guan 已提交
434 435
  if (pCreate->replica <= 1 || pCreate->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
436
    dError("failed to create mnode since %s", terrstr());
S
Shengliang Guan 已提交
437
    return -1;
S
Shengliang Guan 已提交
438
  }
439

S
Shengliang Guan 已提交
440 441 442
  SMnodeOpt option = {0};
  if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) {
    terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
443
    dError("failed to create mnode since %s", terrstr());
S
Shengliang Guan 已提交
444
    return -1;
S
Shengliang Guan 已提交
445
  }
S
Shengliang Guan 已提交
446 447 448 449 450

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
    dndReleaseMnode(pDnode, pMnode);
    terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
S
Shengliang Guan 已提交
451
    dError("failed to create mnode since %s", terrstr());
S
Shengliang Guan 已提交
452 453 454
    return -1;
  }

S
Shengliang Guan 已提交
455
  dDebug("start to create mnode");
S
Shengliang Guan 已提交
456
  return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
457 458
}

S
Shengliang Guan 已提交
459
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
460
  SDAlterMnodeReq *pAlter = dndParseCreateMnodeReq(pReq);
S
Shengliang Guan 已提交
461

S
Shengliang Guan 已提交
462
  if (pAlter->dnodeId != dndGetDnodeId(pDnode)) {
S
Shengliang Guan 已提交
463
    terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
464
    dError("failed to alter mnode since %s", terrstr());
S
Shengliang Guan 已提交
465 466
    return -1;
  }
S
Shengliang Guan 已提交
467 468

  SMnodeOpt option = {0};
S
Shengliang Guan 已提交
469
  if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) {
S
Shengliang Guan 已提交
470
    terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
471
    dError("failed to alter mnode since %s", terrstr());
S
Shengliang Guan 已提交
472 473 474
    return -1;
  }

S
Shengliang Guan 已提交
475 476 477
  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
478
    dError("failed to alter mnode since %s", terrstr());
S
Shengliang Guan 已提交
479 480 481
    return -1;
  }

S
Shengliang Guan 已提交
482
  dDebug("start to alter mnode");
S
Shengliang Guan 已提交
483 484 485 486
  int32_t code = dndAlterMnode(pDnode, &option);
  dndReleaseMnode(pDnode, pMnode);

  return code;
S
Shengliang Guan 已提交
487 488
}

S
Shengliang Guan 已提交
489
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
490
  SDDropMnodeReq *pDrop = pReq->pCont;
S
Shengliang Guan 已提交
491
  pDrop->dnodeId = htonl(pDrop->dnodeId);
S
Shengliang Guan 已提交
492

S
Shengliang Guan 已提交
493
  if (pDrop->dnodeId != dndGetDnodeId(pDnode)) {
S
Shengliang Guan 已提交
494
    terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
495
    dError("failed to drop mnode since %s", terrstr());
S
Shengliang Guan 已提交
496 497
    return -1;
  }
S
Shengliang Guan 已提交
498 499

  SMnode *pMnode = dndAcquireMnode(pDnode);
S
Shengliang Guan 已提交
500
  if (pMnode == NULL) {
S
Shengliang Guan 已提交
501
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
502
    dError("failed to drop mnode since %s", terrstr());
S
Shengliang Guan 已提交
503 504 505
    return -1;
  }

S
Shengliang Guan 已提交
506
  dDebug("start to drop mnode");
S
Shengliang Guan 已提交
507 508 509 510
  int32_t code = dndDropMnode(pDnode);
  dndReleaseMnode(pDnode, pMnode);

  return code;
S
Shengliang Guan 已提交
511 512
}

S
Shengliang Guan 已提交
513
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
514
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
515 516 517

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
S
Shengliang Guan 已提交
518
    mndProcessMsg(pMsg);
S
Shengliang Guan 已提交
519 520
    dndReleaseMnode(pDnode, pMnode);
  } else {
521
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
522 523
  }

524
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
525 526
}

S
Shengliang Guan 已提交
527 528
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) {
  int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
529 530 531

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
S
Shengliang Guan 已提交
532 533 534 535 536
    SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      code = dndWriteMsgToWorker(pWorker, pMsg, 0);
S
Shengliang Guan 已提交
537
      if (code != 0) code = terrno;
S
Shengliang Guan 已提交
538
    }
S
Shengliang Guan 已提交
539

S
Shengliang Guan 已提交
540 541
    if (code != 0) {
      mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
542
    }
S
Shengliang Guan 已提交
543 544 545
  }
  dndReleaseMnode(pDnode, pMnode);

S
Shengliang Guan 已提交
546 547 548
  if (code != 0) {
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code};
S
Shengliang Guan 已提交
549 550
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
551
    rpcFreeCont(pRpcMsg->pCont);
S
Shengliang Guan 已提交
552 553 554
  }
}

S
Shengliang Guan 已提交
555 556
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
S
Shengliang Guan 已提交
557 558
}

S
Shengliang Guan 已提交
559 560
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
S
Shengliang Guan 已提交
561 562
}

S
Shengliang Guan 已提交
563 564
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
S
Shengliang Guan 已提交
565 566 567 568
}

int32_t dndInitMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to init");
S
Shengliang Guan 已提交
569
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
570 571 572 573 574 575 576 577
  taosInitRWLatch(&pMgmt->latch);

  if (dndReadMnodeFile(pDnode) != 0) {
    return -1;
  }

  if (pMgmt->dropped) {
    dInfo("mnode has been deployed and needs to be deleted");
578
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
579 580 581 582 583 584 585 586 587 588 589
    return 0;
  }

  if (!pMgmt->deployed) {
    bool needDeploy = dndNeedDeployMnode(pDnode);
    if (!needDeploy) {
      dDebug("mnode does not need to be deployed");
      return 0;
    }

    dInfo("start to deploy mnode");
S
Shengliang Guan 已提交
590 591 592
    SMnodeOpt option = {0};
    dndBuildMnodeDeployOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
593 594
  } else {
    dInfo("start to open mnode");
S
Shengliang Guan 已提交
595 596 597
    SMnodeOpt option = {0};
    dndBuildMnodeOpenOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
598 599 600 601 602
  }
}

void dndCleanupMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to clean up");
S
Shengliang Guan 已提交
603 604 605 606 607 608
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  if (pMgmt->pMnode) {
    dndStopMnodeWorker(pDnode);
    mndClose(pMgmt->pMnode);
    pMgmt->pMnode = NULL;
  }
S
Shengliang Guan 已提交
609 610 611 612
  dInfo("dnode-mnode is cleaned up");
}

int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
613
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
614 615 616 617 618 619 620 621

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    terrno = TSDB_CODE_APP_NOT_READY;
    dTrace("failed to get user auth since %s", terrstr());
    return -1;
  }

622
  int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
S
Shengliang Guan 已提交
623 624 625
  dndReleaseMnode(pDnode, pMnode);
  return code;
}