dndMnode.c 27.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndMnode.h"
#include "dndDnode.h"
#include "dndTransport.h"

static int32_t dndInitMnodeReadWorker(SDnode *pDnode);
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode);
static void    dndCleanupMnodeReadWorker(SDnode *pDnode);
static void    dndCleanupMnodeWriteWorker(SDnode *pDnode);
static void    dndCleanupMnodeSyncWorker(SDnode *pDnode);
static void    dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode);
static void    dndFreeMnodeReadQueue(SDnode *pDnode);
static void    dndFreeMnodeWriteQueue(SDnode *pDnode);
static void    dndFreeMnodeApplyQueue(SDnode *pDnode);
static void    dndFreeMnodeSyncQueue(SDnode *pDnode);
static void    dndFreeMnodeMgmtQueue(SDnode *pDnode);

static void    dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
void           dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg);

static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void    dndStopMnodeWorker(SDnode *pDnode);

static SMnode *dndAcquireMnode(SDnode *pDnode);
static void    dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);

static int32_t dndReadMnodeFile(SDnode *pDnode);
static int32_t dndWriteMnodeFile(SDnode *pDnode);

S
Shengliang Guan 已提交
61 62
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
S
Shengliang Guan 已提交
63 64 65 66 67 68 69
static int32_t dndDropMnode(SDnode *pDnode);

static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);

static SMnode *dndAcquireMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
70
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81 82
  SMnode     *pMnode = NULL;
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMgmt->deployed && !pMgmt->dropped) {
    refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
    pMnode = pMgmt->pMnode;
  } else {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
  }
  taosRUnLockLatch(&pMgmt->latch);

83 84 85
  if (pMnode != NULL) {
    dTrace("acquire mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
86 87 88 89
  return pMnode;
}

static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
S
Shengliang Guan 已提交
90
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
91 92 93 94 95 96 97 98
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMnode != NULL) {
    refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

99 100 101
  if (pMnode != NULL) {
    dTrace("release mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
102 103 104
}

static int32_t dndReadMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
105
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
106 107
  int32_t     code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
  int32_t     len = 0;
108
  int32_t     maxLen = 4096;
S
Shengliang Guan 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  char       *content = calloc(1, maxLen + 1);
  cJSON      *root = NULL;

  FILE *fp = fopen(pMgmt->file, "r");
  if (fp == NULL) {
    dDebug("file %s not exist", pMgmt->file);
    code = 0;
    goto PRASE_MNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

  cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
S
Shengliang Guan 已提交
133
  if (!deployed || deployed->type != cJSON_Number) {
S
Shengliang Guan 已提交
134 135 136
    dError("failed to read %s since deployed not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
137
  pMgmt->deployed = deployed->valueint;
S
Shengliang Guan 已提交
138 139

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
S
Shengliang Guan 已提交
140
  if (!dropped || dropped->type != cJSON_Number) {
S
Shengliang Guan 已提交
141 142 143
    dError("failed to read %s since dropped not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
144
  pMgmt->dropped = dropped->valueint;
S
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146 147
  cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
  if (!mnodes || mnodes->type != cJSON_Array) {
148 149 150 151
    dError("failed to read %s since nodes not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

S
Shengliang Guan 已提交
152
  pMgmt->replica = cJSON_GetArraySize(mnodes);
153
  if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
S
Shengliang Guan 已提交
154
    dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica);
155 156 157 158
    goto PRASE_MNODE_OVER;
  }

  for (int32_t i = 0; i < pMgmt->replica; ++i) {
S
Shengliang Guan 已提交
159
    cJSON *node = cJSON_GetArrayItem(mnodes, i);
160 161 162 163 164
    if (node == NULL) break;

    SReplica *pReplica = &pMgmt->replicas[i];

    cJSON *id = cJSON_GetObjectItem(node, "id");
S
Shengliang Guan 已提交
165
    if (!id || id->type != cJSON_Number) {
166 167 168
      dError("failed to read %s since id not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
169
    pReplica->id = id->valueint;
170 171 172 173 174 175 176 177 178

    cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
      dError("failed to read %s since fqdn not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
    tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *port = cJSON_GetObjectItem(node, "port");
S
Shengliang Guan 已提交
179
    if (!port || port->type != cJSON_Number) {
180 181 182
      dError("failed to read %s since port not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
183
    pReplica->port = port->valueint;
184 185
  }

S
Shengliang Guan 已提交
186
  code = 0;
S
Shengliang Guan 已提交
187
  dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
188 189 190 191 192 193 194 195 196 197 198

PRASE_MNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  terrno = code;
  return code;
}

static int32_t dndWriteMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
199
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
200 201

  char file[PATH_MAX + 20] = {0};
S
Shengliang Guan 已提交
202 203 204
  snprintf(file, sizeof(file), "%s.bak", pMgmt->file);

  FILE *fp = fopen(file, "w");
S
Shengliang Guan 已提交
205
  if (fp == NULL) {
S
Shengliang Guan 已提交
206 207 208 209 210 211
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }

  int32_t len = 0;
212
  int32_t maxLen = 4096;
S
Shengliang Guan 已提交
213 214 215
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
216
  len += snprintf(content + len, maxLen - len, "  \"deployed\": %d,\n", pMgmt->deployed);
217

S
Shengliang Guan 已提交
218 219
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->dropped);
  len += snprintf(content + len, maxLen - len, "  \"mnodes\": [{\n");
220 221
  for (int32_t i = 0; i < pMgmt->replica; ++i) {
    SReplica *pReplica = &pMgmt->replicas[i];
S
Shengliang Guan 已提交
222
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pReplica->id);
223
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pReplica->fqdn);
S
Shengliang Guan 已提交
224
    len += snprintf(content + len, maxLen - len, "    \"port\": %u\n", pReplica->port);
225 226 227 228 229 230
    if (i < pMgmt->replica - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
S
Shengliang Guan 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);

  if (taosRenameFile(file, pMgmt->file) != 0) {
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to rename %s since %s", pMgmt->file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
244
  dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
245 246 247 248
  return 0;
}

static int32_t dndStartMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
  if (dndInitMnodeReadWorker(pDnode) != 0) {
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

  if (dndInitMnodeWriteWorker(pDnode) != 0) {
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

  if (dndInitMnodeSyncWorker(pDnode) != 0) {
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  if (dndAllocMnodeReadQueue(pDnode) != 0) {
    dError("failed to alloc mnode read queue since %s", terrstr());
    return -1;
  }

  if (dndAllocMnodeWriteQueue(pDnode) != 0) {
    dError("failed to alloc mnode write queue since %s", terrstr());
    return -1;
  }

  if (dndAllocMnodeApplyQueue(pDnode) != 0) {
    dError("failed to alloc mnode apply queue since %s", terrstr());
    return -1;
  }

  if (dndAllocMnodeSyncQueue(pDnode) != 0) {
    dError("failed to alloc mnode sync queue since %s", terrstr());
    return -1;
  }

  return 0;
}

static void dndStopMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
288
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
289 290 291 292 293 294 295 296 297 298 299

  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

  while (pMgmt->refCount > 1) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);

300 301 302 303
  dndCleanupMnodeReadWorker(pDnode);
  dndCleanupMnodeWriteWorker(pDnode);
  dndCleanupMnodeSyncWorker(pDnode);

S
Shengliang Guan 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316 317
  dndFreeMnodeReadQueue(pDnode);
  dndFreeMnodeWriteQueue(pDnode);
  dndFreeMnodeApplyQueue(pDnode);
  dndFreeMnodeSyncQueue(pDnode);
}

static bool dndNeedDeployMnode(SDnode *pDnode) {
  if (dndGetDnodeId(pDnode) > 0) {
    return false;
  }

  if (dndGetClusterId(pDnode) > 0) {
    return false;
  }
S
Shengliang Guan 已提交
318

S
Shengliang Guan 已提交
319 320 321 322 323 324 325
  if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
    return false;
  }

  return true;
}

S
Shengliang Guan 已提交
326 327 328 329 330 331 332 333
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
  pOption->pDnode = pDnode;
  pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
  pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
  pOption->sendRedirectMsgFp = dndSendRedirectMsg;
  pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);
S
Shengliang Guan 已提交
334 335 336 337 338 339 340 341 342
  pOption->cfg.sver = pDnode->opt.sver;
  pOption->cfg.enableTelem = pDnode->opt.enableTelem;
  pOption->cfg.statusInterval = pDnode->opt.statusInterval;
  pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
  pOption->cfg.timezone = pDnode->opt.timezone;
  pOption->cfg.charset = pDnode->opt.charset;
  pOption->cfg.locale = pDnode->opt.locale;
  pOption->cfg.gitinfo = pDnode->opt.gitinfo;
  pOption->cfg.buildinfo = pDnode->opt.buildinfo;
S
Shengliang Guan 已提交
343 344 345 346 347 348 349 350 351
}

static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
  pReplica->port = pDnode->opt.serverPort;
S
Shengliang Guan 已提交
352
  memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
353 354 355 356 357

  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
358 359 360 361
}

static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
362 363 364 365
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
366 367
}

S
Shengliang Guan 已提交
368
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) {
S
Shengliang Guan 已提交
369 370 371 372 373 374
  dndInitMnodeOption(pDnode, pOption);
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);

  pOption->replica = pMsg->replica;
  pOption->selfIndex = -1;
375 376 377 378
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    SReplica *pReplica = &pOption->replicas[i];
    pReplica->id = pMsg->replicas[i].id;
    pReplica->port = pMsg->replicas[i].port;
S
Shengliang Guan 已提交
379
    memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
380
    if (pReplica->id == pOption->dnodeId) {
381
      pOption->selfIndex = i;
S
Shengliang Guan 已提交
382 383 384
    }
  }

S
Shengliang Guan 已提交
385
  if (pOption->selfIndex == -1) {
S
Shengliang Guan 已提交
386 387 388 389 390
    terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

391 392 393 394
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
395 396 397
  return 0;
}

S
Shengliang Guan 已提交
398
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
399
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
400

401
  SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
S
Shengliang Guan 已提交
402 403
  if (pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
S
Shengliang Guan 已提交
404 405 406 407 408 409 410
    return -1;
  }
  pMgmt->deployed = 1;

  int32_t code = dndWriteMnodeFile(pDnode);
  if (code != 0) {
    dError("failed to write mnode file since %s", terrstr());
S
Shengliang Guan 已提交
411
    code = terrno;
S
Shengliang Guan 已提交
412 413 414
    pMgmt->deployed = 0;
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
415
    terrno = code;
S
Shengliang Guan 已提交
416
    return -1;
S
Shengliang Guan 已提交
417 418
  }

S
Shengliang Guan 已提交
419 420 421
  code = dndStartMnodeWorker(pDnode);
  if (code != 0) {
    dError("failed to start mnode worker since %s", terrstr());
S
Shengliang Guan 已提交
422
    code = terrno;
S
Shengliang Guan 已提交
423
    pMgmt->deployed = 0;
S
Shengliang Guan 已提交
424
    dndStopMnodeWorker(pDnode);
425 426
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
427
    terrno = code;
S
Shengliang Guan 已提交
428
    return -1;
S
Shengliang Guan 已提交
429 430 431 432 433 434 435
  }

  taosWLockLatch(&pMgmt->latch);
  pMgmt->pMnode = pMnode;
  pMgmt->deployed = 1;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
436
  dInfo("mnode open successfully");
S
Shengliang Guan 已提交
437 438 439
  return 0;
}

S
Shengliang Guan 已提交
440
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
441
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
442 443 444 445 446 447 448

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to alter mnode since %s", terrstr());
    return -1;
  }

449
  if (mndAlter(pMnode, pOption) != 0) {
S
Shengliang Guan 已提交
450 451 452 453 454 455 456 457 458 459
    dError("failed to alter mnode since %s", terrstr());
    dndReleaseMnode(pDnode, pMnode);
    return -1;
  }

  dndReleaseMnode(pDnode, pMnode);
  return 0;
}

static int32_t dndDropMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
460
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

  taosRLockLatch(&pMgmt->latch);
  pMgmt->dropped = 1;
  taosRUnLockLatch(&pMgmt->latch);

  if (dndWriteMnodeFile(pDnode) != 0) {
    taosRLockLatch(&pMgmt->latch);
    pMgmt->dropped = 0;
    taosRUnLockLatch(&pMgmt->latch);

    dndReleaseMnode(pDnode, pMnode);
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
482
  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
483 484
  dndStopMnodeWorker(pDnode);
  dndWriteMnodeFile(pDnode);
485
  mndClose(pMnode);
S
Shengliang Guan 已提交
486
  pMgmt->pMnode = NULL;
487
  mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
488 489 490 491

  return 0;
}

S
Shengliang Guan 已提交
492 493
static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
  SCreateMnodeInMsg *pMsg = pRpcMsg->pCont;
S
Shengliang Guan 已提交
494 495 496 497 498 499 500 501 502 503
  pMsg->dnodeId = htonl(pMsg->dnodeId);
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
    pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
  }

  return pMsg;
}

static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
504
  SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
505 506 507 508 509

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
S
Shengliang Guan 已提交
510 511
    SMnodeOpt option = {0};
    if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
S
Shengliang Guan 已提交
512 513
      return -1;
    }
514

S
Shengliang Guan 已提交
515 516 517 518 519
    return dndOpenMnode(pDnode, &option);
  }
}

static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
520
  SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
521 522 523 524 525

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  }
S
Shengliang Guan 已提交
526 527 528 529 530 531 532 533 534 535 536

  SMnodeOpt option = {0};
  if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
    return -1;
  }

  if (dndAlterMnode(pDnode, &option) != 0) {
    return -1;
  }

  return dndWriteMnodeFile(pDnode);
S
Shengliang Guan 已提交
537 538 539
}

static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
540 541
  SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
  pMsg->dnodeId = htonl(pMsg->dnodeId);
S
Shengliang Guan 已提交
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
    return dndDropMnode(pDnode);
  }
}

static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
    case TSDB_MSG_TYPE_CREATE_MNODE_IN:
      code = dndProcessCreateMnodeReq(pDnode, pMsg);
      break;
    case TSDB_MSG_TYPE_ALTER_MNODE_IN:
      code = dndProcessAlterMnodeReq(pDnode, pMsg);
      break;
    case TSDB_MSG_TYPE_DROP_MNODE_IN:
      code = dndProcessDropMnodeReq(pDnode, pMsg);
      break;
    default:
S
Shengliang Guan 已提交
565 566
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      code = -1;
S
Shengliang Guan 已提交
567 568 569
      break;
  }

S
Shengliang Guan 已提交
570
  if (pMsg->msgType & 1u) {
S
Shengliang Guan 已提交
571
    if (code != 0) code = terrno;
S
Shengliang Guan 已提交
572 573 574
    SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
    rpcSendResponse(&rsp);
  }
S
Shengliang Guan 已提交
575 576 577 578 579
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
580
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
581 582 583

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
584
    mndProcessReadMsg(pMsg);
S
Shengliang Guan 已提交
585 586
    dndReleaseMnode(pDnode, pMnode);
  } else {
587
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
588 589
  }

590
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
591 592 593
}

static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
594
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
595 596 597

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
598
    mndProcessWriteMsg(pMsg);
S
Shengliang Guan 已提交
599 600
    dndReleaseMnode(pDnode, pMnode);
  } else {
601
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
602 603
  }

604
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
605 606 607
}

static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
608
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
609 610 611

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
612
    mndProcessApplyMsg(pMsg);
S
Shengliang Guan 已提交
613 614
    dndReleaseMnode(pDnode, pMnode);
  } else {
615
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
616 617
  }

618
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
619 620 621
}

static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
622
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
623 624 625

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
626
    mndProcessSyncMsg(pMsg);
S
Shengliang Guan 已提交
627 628
    dndReleaseMnode(pDnode, pMnode);
  } else {
629
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
630 631
  }

632
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
633 634 635
}

static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
636
  SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
S
Shengliang Guan 已提交
637 638 639 640 641 642
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (taosWriteQitem(pQueue, pMsg) != 0) {
643
    mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
644 645 646 647 648 649 650 651
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
652
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
653 654 655
  SMnode     *pMnode = dndAcquireMnode(pDnode);

  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
S
Shengliang Guan 已提交
656 657
  if (pMsg != NULL) *pMsg = *pRpcMsg;

S
Shengliang Guan 已提交
658
  if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
S
Shengliang Guan 已提交
659 660 661 662
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
663 664 665
    rpcFreeCont(pRpcMsg->pCont);
    taosFreeQitem(pMsg);
  }
S
Shengliang Guan 已提交
666 667

  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
668 669 670
}

void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
671
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
672 673
  SMnode     *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
S
Shengliang Guan 已提交
674 675 676 677
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
678
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
679
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
680 681 682 683 684 685
  }

  dndReleaseMnode(pDnode, pMnode);
}

void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
686
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
687 688
  SMnode     *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
S
Shengliang Guan 已提交
689 690 691 692
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
693
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
694
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
695 696 697 698 699 700
  }

  dndReleaseMnode(pDnode, pMnode);
}

void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
701
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
702
  SMnode     *pMnode = dndAcquireMnode(pDnode);
S
Shengliang Guan 已提交
703
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
S
Shengliang Guan 已提交
704 705 706 707
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
708
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
709
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
710 711 712 713 714 715
  }

  dndReleaseMnode(pDnode, pMnode);
}

static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
716
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
717 718 719 720 721 722 723 724 725 726 727 728

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    return -1;
  }

  int32_t code = taosWriteQitem(pMgmt->pApplyQ, pMsg);
  dndReleaseMnode(pDnode, pMnode);
  return code;
}

static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
729
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
730
  pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
S
Shengliang Guan 已提交
731 732 733 734 735 736 737 738
  if (pMgmt->pMgmtQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

static void dndFreeMnodeMgmtQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
739
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
740 741 742 743 744
  tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
  pMgmt->pMgmtQ = NULL;
}

static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
745
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
746 747 748 749 750
  SWorkerPool *pPool = &pMgmt->mgmtPool;
  pPool->name = "mnode-mgmt";
  pPool->min = 1;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
751
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
752 753 754
    return -1;
  }

755
  dDebug("mnode mgmt worker is initialized");
S
Shengliang Guan 已提交
756 757 758 759
  return 0;
}

static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
760
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
761
  tWorkerCleanup(&pMgmt->mgmtPool);
762
  dDebug("mnode mgmt worker is closed");
S
Shengliang Guan 已提交
763 764 765
}

static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
766
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
767
  pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue);
S
Shengliang Guan 已提交
768 769 770 771
  if (pMgmt->pReadQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
772

S
Shengliang Guan 已提交
773 774 775 776
  return 0;
}

static void dndFreeMnodeReadQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
777
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
778 779 780 781 782
  tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ);
  pMgmt->pReadQ = NULL;
}

static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
783
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
784 785 786 787 788
  SWorkerPool *pPool = &pMgmt->readPool;
  pPool->name = "mnode-read";
  pPool->min = 0;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
789
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
790 791 792
    return -1;
  }

793
  dDebug("mnode read worker is initialized");
S
Shengliang Guan 已提交
794 795 796 797
  return 0;
}

static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
798
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
799
  tWorkerCleanup(&pMgmt->readPool);
800
  dDebug("mnode read worker is closed");
S
Shengliang Guan 已提交
801 802 803
}

static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
804
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
805
  pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue);
S
Shengliang Guan 已提交
806 807 808 809
  if (pMgmt->pWriteQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
810

S
Shengliang Guan 已提交
811 812 813 814
  return 0;
}

static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
815
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
816 817 818 819 820
  tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ);
  pMgmt->pWriteQ = NULL;
}

static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
821
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
822
  pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue);
S
Shengliang Guan 已提交
823 824 825 826
  if (pMgmt->pApplyQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
827

S
Shengliang Guan 已提交
828 829 830 831
  return 0;
}

static void dndFreeMnodeApplyQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
832
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
833 834 835 836 837
  tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ);
  pMgmt->pApplyQ = NULL;
}

static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
838
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
839 840 841 842 843
  SWorkerPool *pPool = &pMgmt->writePool;
  pPool->name = "mnode-write";
  pPool->min = 0;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
844
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
845 846 847
    return -1;
  }

848
  dDebug("mnode write worker is initialized");
S
Shengliang Guan 已提交
849 850 851 852
  return 0;
}

static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
853
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
854
  tWorkerCleanup(&pMgmt->writePool);
855
  dDebug("mnode write worker is closed");
S
Shengliang Guan 已提交
856 857 858
}

static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
859
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
860
  pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue);
S
Shengliang Guan 已提交
861 862 863 864
  if (pMgmt->pSyncQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
865

S
Shengliang Guan 已提交
866 867 868 869
  return 0;
}

static void dndFreeMnodeSyncQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
870
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
871 872 873 874 875
  tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ);
  pMgmt->pSyncQ = NULL;
}

static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
876
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
877 878 879 880
  SWorkerPool *pPool = &pMgmt->syncPool;
  pPool->name = "mnode-sync";
  pPool->min = 0;
  pPool->max = 1;
S
Shengliang Guan 已提交
881 882 883 884 885
  if (tWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

886
  dDebug("mnode sync worker is initialized");
S
Shengliang Guan 已提交
887
  return 0;
S
Shengliang Guan 已提交
888 889 890
}

static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
891
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
892
  tWorkerCleanup(&pMgmt->syncPool);
893
  dDebug("mnode sync worker is closed");
S
Shengliang Guan 已提交
894 895 896 897
}

int32_t dndInitMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to init");
S
Shengliang Guan 已提交
898
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
899 900 901 902 903 904 905
  taosInitRWLatch(&pMgmt->latch);

  if (dndInitMnodeMgmtWorker(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
906 907 908 909 910
  if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923 924
  char path[PATH_MAX];
  snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
  pMgmt->file = strdup(path);
  if (pMgmt->file == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (dndReadMnodeFile(pDnode) != 0) {
    return -1;
  }

  if (pMgmt->dropped) {
    dInfo("mnode has been deployed and needs to be deleted");
925
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
926 927 928 929 930 931 932 933 934 935 936
    return 0;
  }

  if (!pMgmt->deployed) {
    bool needDeploy = dndNeedDeployMnode(pDnode);
    if (!needDeploy) {
      dDebug("mnode does not need to be deployed");
      return 0;
    }

    dInfo("start to deploy mnode");
S
Shengliang Guan 已提交
937 938 939
    SMnodeOpt option = {0};
    dndBuildMnodeDeployOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
940 941
  } else {
    dInfo("start to open mnode");
S
Shengliang Guan 已提交
942 943 944
    SMnodeOpt option = {0};
    dndBuildMnodeOpenOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
945 946 947 948
  }
}

void dndCleanupMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
949
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
950 951

  dInfo("dnode-mnode start to clean up");
S
Shengliang Guan 已提交
952
  if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
S
Shengliang Guan 已提交
953
  dndCleanupMnodeMgmtWorker(pDnode);
S
Shengliang Guan 已提交
954
  dndFreeMnodeMgmtQueue(pDnode);
S
Shengliang Guan 已提交
955
  tfree(pMgmt->file);
956
  mndClose(pMgmt->pMnode);
S
Shengliang Guan 已提交
957 958 959 960
  dInfo("dnode-mnode is cleaned up");
}

int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
961
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
962 963 964 965 966 967 968 969

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    terrno = TSDB_CODE_APP_NOT_READY;
    dTrace("failed to get user auth since %s", terrstr());
    return -1;
  }

970
  int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
S
Shengliang Guan 已提交
971 972 973
  dndReleaseMnode(pDnode, pMnode);
  return code;
}