dndMnode.c 16.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndMnode.h"
#include "dndDnode.h"
#include "dndTransport.h"
S
Shengliang Guan 已提交
20
#include "dndWorker.h"
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
23 24

static SMnode *dndAcquireMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
25
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33 34 35 36 37
  SMnode     *pMnode = NULL;
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMgmt->deployed && !pMgmt->dropped) {
    refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
    pMnode = pMgmt->pMnode;
  } else {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
  }
  taosRUnLockLatch(&pMgmt->latch);

38 39 40
  if (pMnode != NULL) {
    dTrace("acquire mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
41 42 43 44
  return pMnode;
}

static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
S
Shengliang Guan 已提交
45
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMnode != NULL) {
    refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

54 55 56
  if (pMnode != NULL) {
    dTrace("release mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
57 58 59
}

static int32_t dndReadMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
60
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
61 62
  int32_t     code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
  int32_t     len = 0;
63
  int32_t     maxLen = 4096;
S
Shengliang Guan 已提交
64 65 66
  char       *content = calloc(1, maxLen + 1);
  cJSON      *root = NULL;

S
Shengliang Guan 已提交
67 68 69 70
  char file[PATH_MAX + 20];
  snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);

  FILE *fp = fopen(file, "r");
S
Shengliang Guan 已提交
71
  if (fp == NULL) {
S
Shengliang Guan 已提交
72
    dDebug("file %s not exist", file);
S
Shengliang Guan 已提交
73 74 75 76 77 78
    code = 0;
    goto PRASE_MNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
S
Shengliang Guan 已提交
79
    dError("failed to read %s since content is null", file);
S
Shengliang Guan 已提交
80 81 82 83 84 85
    goto PRASE_MNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
S
Shengliang Guan 已提交
86
    dError("failed to read %s since invalid json format", file);
S
Shengliang Guan 已提交
87 88 89 90
    goto PRASE_MNODE_OVER;
  }

  cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
S
Shengliang Guan 已提交
91
  if (!deployed || deployed->type != cJSON_Number) {
S
Shengliang Guan 已提交
92
    dError("failed to read %s since deployed not found", file);
S
Shengliang Guan 已提交
93 94
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
95
  pMgmt->deployed = deployed->valueint;
S
Shengliang Guan 已提交
96 97

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
S
Shengliang Guan 已提交
98
  if (!dropped || dropped->type != cJSON_Number) {
S
Shengliang Guan 已提交
99
    dError("failed to read %s since dropped not found", file);
S
Shengliang Guan 已提交
100 101
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
102
  pMgmt->dropped = dropped->valueint;
S
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104 105
  cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
  if (!mnodes || mnodes->type != cJSON_Array) {
S
Shengliang Guan 已提交
106
    dError("failed to read %s since nodes not found", file);
107 108 109
    goto PRASE_MNODE_OVER;
  }

S
Shengliang Guan 已提交
110
  pMgmt->replica = cJSON_GetArraySize(mnodes);
111
  if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
S
Shengliang Guan 已提交
112
    dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
113 114 115 116
    goto PRASE_MNODE_OVER;
  }

  for (int32_t i = 0; i < pMgmt->replica; ++i) {
S
Shengliang Guan 已提交
117
    cJSON *node = cJSON_GetArrayItem(mnodes, i);
118 119 120 121 122
    if (node == NULL) break;

    SReplica *pReplica = &pMgmt->replicas[i];

    cJSON *id = cJSON_GetObjectItem(node, "id");
S
Shengliang Guan 已提交
123
    if (!id || id->type != cJSON_Number) {
S
Shengliang Guan 已提交
124
      dError("failed to read %s since id not found", file);
125 126
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
127
    pReplica->id = id->valueint;
128 129 130

    cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
S
Shengliang Guan 已提交
131
      dError("failed to read %s since fqdn not found", file);
132 133 134 135 136
      goto PRASE_MNODE_OVER;
    }
    tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *port = cJSON_GetObjectItem(node, "port");
S
Shengliang Guan 已提交
137
    if (!port || port->type != cJSON_Number) {
S
Shengliang Guan 已提交
138
      dError("failed to read %s since port not found", file);
139 140
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
141
    pReplica->port = port->valueint;
142 143
  }

S
Shengliang Guan 已提交
144
  code = 0;
S
Shengliang Guan 已提交
145
  dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
146 147 148 149 150 151 152 153 154 155 156

PRASE_MNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  terrno = code;
  return code;
}

static int32_t dndWriteMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
157
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
158

S
Shengliang Guan 已提交
159 160
  char file[PATH_MAX + 20];
  snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode);
S
Shengliang Guan 已提交
161 162

  FILE *fp = fopen(file, "w");
S
Shengliang Guan 已提交
163
  if (fp == NULL) {
S
Shengliang Guan 已提交
164 165 166 167 168 169
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }

  int32_t len = 0;
170
  int32_t maxLen = 4096;
S
Shengliang Guan 已提交
171 172 173
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
174
  len += snprintf(content + len, maxLen - len, "  \"deployed\": %d,\n", pMgmt->deployed);
175

S
Shengliang Guan 已提交
176 177
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->dropped);
  len += snprintf(content + len, maxLen - len, "  \"mnodes\": [{\n");
178 179
  for (int32_t i = 0; i < pMgmt->replica; ++i) {
    SReplica *pReplica = &pMgmt->replicas[i];
S
Shengliang Guan 已提交
180
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pReplica->id);
181
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pReplica->fqdn);
S
Shengliang Guan 已提交
182
    len += snprintf(content + len, maxLen - len, "    \"port\": %u\n", pReplica->port);
183 184 185 186 187 188
    if (i < pMgmt->replica - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
S
Shengliang Guan 已提交
189 190 191 192 193 194 195
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);

S
Shengliang Guan 已提交
196 197 198 199
  char realfile[PATH_MAX + 20];
  snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode);

  if (taosRenameFile(file, realfile) != 0) {
S
Shengliang Guan 已提交
200
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
S
Shengliang Guan 已提交
201
    dError("failed to rename %s since %s", file, terrstr());
S
Shengliang Guan 已提交
202 203 204
    return -1;
  }

S
Shengliang Guan 已提交
205
  dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
206 207 208 209
  return 0;
}

static int32_t dndStartMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
210 211
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
212 213 214 215
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
216
  if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
217 218 219 220
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
221
  if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) {
S
Shengliang Guan 已提交
222 223 224 225
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
226 227 228 229
  return 0;
}

static void dndStopMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
230
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
231 232 233 234 235

  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
236 237 238
  while (pMgmt->refCount > 1) {
    taosMsleep(10);
  }
239

S
Shengliang Guan 已提交
240 241 242
  dndCleanupWorker(&pMgmt->readWorker);
  dndCleanupWorker(&pMgmt->writeWorker);
  dndCleanupWorker(&pMgmt->syncWorker);
S
Shengliang Guan 已提交
243 244 245 246 247 248 249 250 251 252
}

static bool dndNeedDeployMnode(SDnode *pDnode) {
  if (dndGetDnodeId(pDnode) > 0) {
    return false;
  }

  if (dndGetClusterId(pDnode) > 0) {
    return false;
  }
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254 255 256 257 258 259 260
  if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
    return false;
  }

  return true;
}

S
Shengliang Guan 已提交
261 262 263 264 265 266 267
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
  pOption->pDnode = pDnode;
  pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
  pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
  pOption->sendRedirectMsgFp = dndSendRedirectMsg;
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);
S
Shengliang Guan 已提交
268 269 270 271 272 273 274 275 276
  pOption->cfg.sver = pDnode->opt.sver;
  pOption->cfg.enableTelem = pDnode->opt.enableTelem;
  pOption->cfg.statusInterval = pDnode->opt.statusInterval;
  pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
  pOption->cfg.timezone = pDnode->opt.timezone;
  pOption->cfg.charset = pDnode->opt.charset;
  pOption->cfg.locale = pDnode->opt.locale;
  pOption->cfg.gitinfo = pDnode->opt.gitinfo;
  pOption->cfg.buildinfo = pDnode->opt.buildinfo;
S
Shengliang Guan 已提交
277 278 279 280 281 282 283 284 285
}

static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
  pReplica->port = pDnode->opt.serverPort;
S
Shengliang Guan 已提交
286
  memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
287 288 289 290 291

  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
292 293 294 295
}

static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
296 297 298 299
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
300 301
}

S
Shengliang Guan 已提交
302
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) {
S
Shengliang Guan 已提交
303 304 305 306 307 308
  dndInitMnodeOption(pDnode, pOption);
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);

  pOption->replica = pMsg->replica;
  pOption->selfIndex = -1;
309 310 311 312
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    SReplica *pReplica = &pOption->replicas[i];
    pReplica->id = pMsg->replicas[i].id;
    pReplica->port = pMsg->replicas[i].port;
S
Shengliang Guan 已提交
313
    memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
314
    if (pReplica->id == pOption->dnodeId) {
315
      pOption->selfIndex = i;
S
Shengliang Guan 已提交
316 317 318
    }
  }

S
Shengliang Guan 已提交
319
  if (pOption->selfIndex == -1) {
S
Shengliang Guan 已提交
320 321 322 323 324
    terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

325 326 327 328
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
329 330 331
  return 0;
}

S
Shengliang Guan 已提交
332
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
333
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
334

335
  SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
S
Shengliang Guan 已提交
336 337
  if (pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
S
Shengliang Guan 已提交
338 339 340
    return -1;
  }

S
Shengliang Guan 已提交
341 342
  if (dndStartMnodeWorker(pDnode) != 0) {
    dError("failed to start mnode worker since %s", terrstr());
S
Shengliang Guan 已提交
343 344 345
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
    return -1;
S
Shengliang Guan 已提交
346 347
  }

S
Shengliang Guan 已提交
348 349 350
  pMgmt->deployed = 1;
  if (dndWriteMnodeFile(pDnode) != 0) {
    dError("failed to write mnode file since %s", terrstr());
S
Shengliang Guan 已提交
351
    pMgmt->deployed = 0;
S
Shengliang Guan 已提交
352
    dndStopMnodeWorker(pDnode);
353 354
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
355
    return -1;
S
Shengliang Guan 已提交
356 357 358 359 360 361 362
  }

  taosWLockLatch(&pMgmt->latch);
  pMgmt->pMnode = pMnode;
  pMgmt->deployed = 1;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
363
  dInfo("mnode open successfully");
S
Shengliang Guan 已提交
364 365 366
  return 0;
}

S
Shengliang Guan 已提交
367
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
368
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
369 370 371 372 373 374 375

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to alter mnode since %s", terrstr());
    return -1;
  }

376
  if (mndAlter(pMnode, pOption) != 0) {
S
Shengliang Guan 已提交
377 378 379 380 381 382 383 384 385 386
    dError("failed to alter mnode since %s", terrstr());
    dndReleaseMnode(pDnode, pMnode);
    return -1;
  }

  dndReleaseMnode(pDnode, pMnode);
  return 0;
}

static int32_t dndDropMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
387
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

  taosRLockLatch(&pMgmt->latch);
  pMgmt->dropped = 1;
  taosRUnLockLatch(&pMgmt->latch);

  if (dndWriteMnodeFile(pDnode) != 0) {
    taosRLockLatch(&pMgmt->latch);
    pMgmt->dropped = 0;
    taosRUnLockLatch(&pMgmt->latch);

    dndReleaseMnode(pDnode, pMnode);
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
409
  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
410
  dndStopMnodeWorker(pDnode);
S
Shengliang Guan 已提交
411
  pMgmt->deployed = 0;
S
Shengliang Guan 已提交
412
  dndWriteMnodeFile(pDnode);
413
  mndClose(pMnode);
S
Shengliang Guan 已提交
414
  pMgmt->pMnode = NULL;
415
  mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
416 417 418 419

  return 0;
}

S
Shengliang Guan 已提交
420 421
static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
  SCreateMnodeInMsg *pMsg = pRpcMsg->pCont;
S
Shengliang Guan 已提交
422 423 424 425 426 427 428 429 430
  pMsg->dnodeId = htonl(pMsg->dnodeId);
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
    pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
  }

  return pMsg;
}

S
Shengliang Guan 已提交
431
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
432
  SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
433 434 435 436 437

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
S
Shengliang Guan 已提交
438 439
    SMnodeOpt option = {0};
    if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
S
Shengliang Guan 已提交
440 441
      return -1;
    }
442

S
Shengliang Guan 已提交
443 444 445 446
    return dndOpenMnode(pDnode, &option);
  }
}

S
Shengliang Guan 已提交
447
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
448
  SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
449 450 451 452 453

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  }
S
Shengliang Guan 已提交
454 455 456 457 458 459 460 461 462 463 464

  SMnodeOpt option = {0};
  if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
    return -1;
  }

  if (dndAlterMnode(pDnode, &option) != 0) {
    return -1;
  }

  return dndWriteMnodeFile(pDnode);
S
Shengliang Guan 已提交
465 466
}

S
Shengliang Guan 已提交
467
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
468 469
  SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
  pMsg->dnodeId = htonl(pMsg->dnodeId);
S
Shengliang Guan 已提交
470 471 472 473 474 475 476 477 478

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
    return dndDropMnode(pDnode);
  }
}

S
Shengliang Guan 已提交
479
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
480
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
481 482 483

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
S
Shengliang Guan 已提交
484
    mndProcessMsg(pMsg);
S
Shengliang Guan 已提交
485 486
    dndReleaseMnode(pDnode, pMnode);
  } else {
487
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
488 489
  }

490
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
491 492
}

S
Shengliang Guan 已提交
493 494
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) {
  int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
S
Shengliang Guan 已提交
495 496 497

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
S
Shengliang Guan 已提交
498 499 500 501 502
    SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      code = dndWriteMsgToWorker(pWorker, pMsg, 0);
S
Shengliang Guan 已提交
503
    }
S
Shengliang Guan 已提交
504

S
Shengliang Guan 已提交
505 506
    if (code != 0) {
      mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
507
    }
S
Shengliang Guan 已提交
508 509 510
  }
  dndReleaseMnode(pDnode, pMnode);

S
Shengliang Guan 已提交
511 512 513
  if (code != 0) {
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code};
S
Shengliang Guan 已提交
514 515
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
516
    rpcFreeCont(pRpcMsg->pCont);
S
Shengliang Guan 已提交
517 518 519
  }
}

S
Shengliang Guan 已提交
520 521
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
S
Shengliang Guan 已提交
522 523
}

S
Shengliang Guan 已提交
524 525
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
S
Shengliang Guan 已提交
526 527
}

S
Shengliang Guan 已提交
528 529
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
S
Shengliang Guan 已提交
530 531 532 533
}

int32_t dndInitMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to init");
S
Shengliang Guan 已提交
534
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
535 536 537 538 539 540 541 542
  taosInitRWLatch(&pMgmt->latch);

  if (dndReadMnodeFile(pDnode) != 0) {
    return -1;
  }

  if (pMgmt->dropped) {
    dInfo("mnode has been deployed and needs to be deleted");
543
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
544 545 546 547 548 549 550 551 552 553 554
    return 0;
  }

  if (!pMgmt->deployed) {
    bool needDeploy = dndNeedDeployMnode(pDnode);
    if (!needDeploy) {
      dDebug("mnode does not need to be deployed");
      return 0;
    }

    dInfo("start to deploy mnode");
S
Shengliang Guan 已提交
555 556 557
    SMnodeOpt option = {0};
    dndBuildMnodeDeployOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
558 559
  } else {
    dInfo("start to open mnode");
S
Shengliang Guan 已提交
560 561 562
    SMnodeOpt option = {0};
    dndBuildMnodeOpenOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
563 564 565 566 567
  }
}

void dndCleanupMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to clean up");
S
Shengliang Guan 已提交
568 569 570 571 572 573
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  if (pMgmt->pMnode) {
    dndStopMnodeWorker(pDnode);
    mndClose(pMgmt->pMnode);
    pMgmt->pMnode = NULL;
  }
S
Shengliang Guan 已提交
574 575 576 577
  dInfo("dnode-mnode is cleaned up");
}

int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
578
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
579 580 581 582 583 584 585 586

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    terrno = TSDB_CODE_APP_NOT_READY;
    dTrace("failed to get user auth since %s", terrstr());
    return -1;
  }

587
  int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
S
Shengliang Guan 已提交
588 589 590
  dndReleaseMnode(pDnode, pMnode);
  return code;
}