dndMnode.c 26.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndMnode.h"
#include "dndDnode.h"
#include "dndTransport.h"

static int32_t dndInitMnodeReadWorker(SDnode *pDnode);
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode);
static void    dndCleanupMnodeReadWorker(SDnode *pDnode);
static void    dndCleanupMnodeWriteWorker(SDnode *pDnode);
static void    dndCleanupMnodeSyncWorker(SDnode *pDnode);
static void    dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode);
static void    dndFreeMnodeReadQueue(SDnode *pDnode);
static void    dndFreeMnodeWriteQueue(SDnode *pDnode);
static void    dndFreeMnodeSyncQueue(SDnode *pDnode);
static void    dndFreeMnodeMgmtQueue(SDnode *pDnode);

static void    dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void    dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
void           dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);

static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void    dndStopMnodeWorker(SDnode *pDnode);

static SMnode *dndAcquireMnode(SDnode *pDnode);
static void    dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);

static int32_t dndReadMnodeFile(SDnode *pDnode);
static int32_t dndWriteMnodeFile(SDnode *pDnode);

S
Shengliang Guan 已提交
57 58
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
S
Shengliang Guan 已提交
59 60 61 62 63 64 65
static int32_t dndDropMnode(SDnode *pDnode);

static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);

static SMnode *dndAcquireMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
66
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
67 68 69 70 71 72 73 74 75 76 77 78
  SMnode     *pMnode = NULL;
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMgmt->deployed && !pMgmt->dropped) {
    refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
    pMnode = pMgmt->pMnode;
  } else {
    terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
  }
  taosRUnLockLatch(&pMgmt->latch);

79 80 81
  if (pMnode != NULL) {
    dTrace("acquire mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
82 83 84 85
  return pMnode;
}

static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
S
Shengliang Guan 已提交
86
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
87 88 89 90 91 92 93 94
  int32_t     refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  if (pMnode != NULL) {
    refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

95 96 97
  if (pMnode != NULL) {
    dTrace("release mnode, refCount:%d", refCount);
  }
S
Shengliang Guan 已提交
98 99 100
}

static int32_t dndReadMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
101
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
102 103
  int32_t     code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
  int32_t     len = 0;
104
  int32_t     maxLen = 4096;
S
Shengliang Guan 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  char       *content = calloc(1, maxLen + 1);
  cJSON      *root = NULL;

  FILE *fp = fopen(pMgmt->file, "r");
  if (fp == NULL) {
    dDebug("file %s not exist", pMgmt->file);
    code = 0;
    goto PRASE_MNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

  cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
S
Shengliang Guan 已提交
129
  if (!deployed || deployed->type != cJSON_Number) {
S
Shengliang Guan 已提交
130 131 132
    dError("failed to read %s since deployed not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
133
  pMgmt->deployed = deployed->valueint;
S
Shengliang Guan 已提交
134 135

  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
S
Shengliang Guan 已提交
136
  if (!dropped || dropped->type != cJSON_Number) {
S
Shengliang Guan 已提交
137 138 139
    dError("failed to read %s since dropped not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }
S
Shengliang Guan 已提交
140
  pMgmt->dropped = dropped->valueint;
S
Shengliang Guan 已提交
141

S
Shengliang Guan 已提交
142 143
  cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
  if (!mnodes || mnodes->type != cJSON_Array) {
144 145 146 147
    dError("failed to read %s since nodes not found", pMgmt->file);
    goto PRASE_MNODE_OVER;
  }

S
Shengliang Guan 已提交
148
  pMgmt->replica = cJSON_GetArraySize(mnodes);
149
  if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
S
Shengliang Guan 已提交
150
    dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica);
151 152 153 154
    goto PRASE_MNODE_OVER;
  }

  for (int32_t i = 0; i < pMgmt->replica; ++i) {
S
Shengliang Guan 已提交
155
    cJSON *node = cJSON_GetArrayItem(mnodes, i);
156 157 158 159 160
    if (node == NULL) break;

    SReplica *pReplica = &pMgmt->replicas[i];

    cJSON *id = cJSON_GetObjectItem(node, "id");
S
Shengliang Guan 已提交
161
    if (!id || id->type != cJSON_Number) {
162 163 164
      dError("failed to read %s since id not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
165
    pReplica->id = id->valueint;
166 167 168 169 170 171 172 173 174

    cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
    if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
      dError("failed to read %s since fqdn not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
    tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);

    cJSON *port = cJSON_GetObjectItem(node, "port");
S
Shengliang Guan 已提交
175
    if (!port || port->type != cJSON_Number) {
176 177 178
      dError("failed to read %s since port not found", pMgmt->file);
      goto PRASE_MNODE_OVER;
    }
S
Shengliang Guan 已提交
179
    pReplica->port = port->valueint;
180 181
  }

S
Shengliang Guan 已提交
182
  code = 0;
S
Shengliang Guan 已提交
183
  dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
184 185 186 187 188 189 190 191 192 193 194

PRASE_MNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  terrno = code;
  return code;
}

static int32_t dndWriteMnodeFile(SDnode *pDnode) {
S
Shengliang Guan 已提交
195
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
196 197

  char file[PATH_MAX + 20] = {0};
S
Shengliang Guan 已提交
198 199 200
  snprintf(file, sizeof(file), "%s.bak", pMgmt->file);

  FILE *fp = fopen(file, "w");
S
Shengliang Guan 已提交
201
  if (fp == NULL) {
S
Shengliang Guan 已提交
202 203 204 205 206 207
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }

  int32_t len = 0;
208
  int32_t maxLen = 4096;
S
Shengliang Guan 已提交
209 210 211
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
212
  len += snprintf(content + len, maxLen - len, "  \"deployed\": %d,\n", pMgmt->deployed);
213

S
Shengliang Guan 已提交
214 215
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->dropped);
  len += snprintf(content + len, maxLen - len, "  \"mnodes\": [{\n");
216 217
  for (int32_t i = 0; i < pMgmt->replica; ++i) {
    SReplica *pReplica = &pMgmt->replicas[i];
S
Shengliang Guan 已提交
218
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pReplica->id);
219
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pReplica->fqdn);
S
Shengliang Guan 已提交
220
    len += snprintf(content + len, maxLen - len, "    \"port\": %u\n", pReplica->port);
221 222 223 224 225 226
    if (i < pMgmt->replica - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
S
Shengliang Guan 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);

  if (taosRenameFile(file, pMgmt->file) != 0) {
    terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
    dError("failed to rename %s since %s", pMgmt->file, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
240
  dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
S
Shengliang Guan 已提交
241 242 243 244
  return 0;
}

static int32_t dndStartMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  if (dndInitMnodeReadWorker(pDnode) != 0) {
    dError("failed to start mnode read worker since %s", terrstr());
    return -1;
  }

  if (dndInitMnodeWriteWorker(pDnode) != 0) {
    dError("failed to start mnode write worker since %s", terrstr());
    return -1;
  }

  if (dndInitMnodeSyncWorker(pDnode) != 0) {
    dError("failed to start mnode sync worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
  if (dndAllocMnodeReadQueue(pDnode) != 0) {
    dError("failed to alloc mnode read queue since %s", terrstr());
    return -1;
  }

  if (dndAllocMnodeWriteQueue(pDnode) != 0) {
    dError("failed to alloc mnode write queue since %s", terrstr());
    return -1;
  }

  if (dndAllocMnodeSyncQueue(pDnode) != 0) {
    dError("failed to alloc mnode sync queue since %s", terrstr());
    return -1;
  }

  return 0;
}

static void dndStopMnodeWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
279
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
280 281 282 283 284 285 286 287 288 289

  taosWLockLatch(&pMgmt->latch);
  pMgmt->deployed = 0;
  taosWUnLockLatch(&pMgmt->latch);

  while (pMgmt->refCount > 1) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);

290 291 292 293
  dndCleanupMnodeReadWorker(pDnode);
  dndCleanupMnodeWriteWorker(pDnode);
  dndCleanupMnodeSyncWorker(pDnode);

S
Shengliang Guan 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306
  dndFreeMnodeReadQueue(pDnode);
  dndFreeMnodeWriteQueue(pDnode);
  dndFreeMnodeSyncQueue(pDnode);
}

static bool dndNeedDeployMnode(SDnode *pDnode) {
  if (dndGetDnodeId(pDnode) > 0) {
    return false;
  }

  if (dndGetClusterId(pDnode) > 0) {
    return false;
  }
S
Shengliang Guan 已提交
307

S
Shengliang Guan 已提交
308 309 310 311 312 313 314
  if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
    return false;
  }

  return true;
}

S
Shengliang Guan 已提交
315 316 317 318 319 320 321
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
  pOption->pDnode = pDnode;
  pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
  pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
  pOption->sendRedirectMsgFp = dndSendRedirectMsg;
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);
S
Shengliang Guan 已提交
322 323 324 325 326 327 328 329 330
  pOption->cfg.sver = pDnode->opt.sver;
  pOption->cfg.enableTelem = pDnode->opt.enableTelem;
  pOption->cfg.statusInterval = pDnode->opt.statusInterval;
  pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
  pOption->cfg.timezone = pDnode->opt.timezone;
  pOption->cfg.charset = pDnode->opt.charset;
  pOption->cfg.locale = pDnode->opt.locale;
  pOption->cfg.gitinfo = pDnode->opt.gitinfo;
  pOption->cfg.buildinfo = pDnode->opt.buildinfo;
S
Shengliang Guan 已提交
331 332 333 334 335 336 337 338 339
}

static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
  pOption->replica = 1;
  pOption->selfIndex = 0;
  SReplica *pReplica = &pOption->replicas[0];
  pReplica->id = 1;
  pReplica->port = pDnode->opt.serverPort;
S
Shengliang Guan 已提交
340
  memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
341 342 343 344 345

  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
346 347 348 349
}

static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
  dndInitMnodeOption(pDnode, pOption);
350 351 352 353
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pOption->selfIndex = pMgmt->selfIndex;
  pOption->replica = pMgmt->replica;
  memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
354 355
}

S
Shengliang Guan 已提交
356
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) {
S
Shengliang Guan 已提交
357 358 359 360 361 362
  dndInitMnodeOption(pDnode, pOption);
  pOption->dnodeId = dndGetDnodeId(pDnode);
  pOption->clusterId = dndGetClusterId(pDnode);

  pOption->replica = pMsg->replica;
  pOption->selfIndex = -1;
363 364 365 366
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    SReplica *pReplica = &pOption->replicas[i];
    pReplica->id = pMsg->replicas[i].id;
    pReplica->port = pMsg->replicas[i].port;
S
Shengliang Guan 已提交
367
    memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
368
    if (pReplica->id == pOption->dnodeId) {
369
      pOption->selfIndex = i;
S
Shengliang Guan 已提交
370 371 372
    }
  }

S
Shengliang Guan 已提交
373
  if (pOption->selfIndex == -1) {
S
Shengliang Guan 已提交
374 375 376 377 378
    terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
    dError("failed to build mnode options since %s", terrstr());
    return -1;
  }

379 380 381 382
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
  pMgmt->selfIndex = pOption->selfIndex;
  pMgmt->replica = pOption->replica;
  memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
383 384 385
  return 0;
}

S
Shengliang Guan 已提交
386
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
387
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
388

389
  SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
S
Shengliang Guan 已提交
390 391
  if (pMnode == NULL) {
    dError("failed to open mnode since %s", terrstr());
S
Shengliang Guan 已提交
392 393 394 395 396 397 398
    return -1;
  }
  pMgmt->deployed = 1;

  int32_t code = dndWriteMnodeFile(pDnode);
  if (code != 0) {
    dError("failed to write mnode file since %s", terrstr());
S
Shengliang Guan 已提交
399
    code = terrno;
S
Shengliang Guan 已提交
400 401 402
    pMgmt->deployed = 0;
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
403
    terrno = code;
S
Shengliang Guan 已提交
404
    return -1;
S
Shengliang Guan 已提交
405 406
  }

S
Shengliang Guan 已提交
407 408 409
  code = dndStartMnodeWorker(pDnode);
  if (code != 0) {
    dError("failed to start mnode worker since %s", terrstr());
S
Shengliang Guan 已提交
410
    code = terrno;
S
Shengliang Guan 已提交
411
    pMgmt->deployed = 0;
S
Shengliang Guan 已提交
412
    dndStopMnodeWorker(pDnode);
413 414
    mndClose(pMnode);
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
415
    terrno = code;
S
Shengliang Guan 已提交
416
    return -1;
S
Shengliang Guan 已提交
417 418 419 420 421 422 423
  }

  taosWLockLatch(&pMgmt->latch);
  pMgmt->pMnode = pMnode;
  pMgmt->deployed = 1;
  taosWUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
424
  dInfo("mnode open successfully");
S
Shengliang Guan 已提交
425 426 427
  return 0;
}

S
Shengliang Guan 已提交
428
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
429
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
430 431 432 433 434 435 436

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to alter mnode since %s", terrstr());
    return -1;
  }

437
  if (mndAlter(pMnode, pOption) != 0) {
S
Shengliang Guan 已提交
438 439 440 441 442 443 444 445 446 447
    dError("failed to alter mnode since %s", terrstr());
    dndReleaseMnode(pDnode, pMnode);
    return -1;
  }

  dndReleaseMnode(pDnode, pMnode);
  return 0;
}

static int32_t dndDropMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
448
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

  taosRLockLatch(&pMgmt->latch);
  pMgmt->dropped = 1;
  taosRUnLockLatch(&pMgmt->latch);

  if (dndWriteMnodeFile(pDnode) != 0) {
    taosRLockLatch(&pMgmt->latch);
    pMgmt->dropped = 0;
    taosRUnLockLatch(&pMgmt->latch);

    dndReleaseMnode(pDnode, pMnode);
    dError("failed to drop mnode since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
470
  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
471 472
  dndStopMnodeWorker(pDnode);
  dndWriteMnodeFile(pDnode);
473
  mndClose(pMnode);
S
Shengliang Guan 已提交
474
  pMgmt->pMnode = NULL;
475
  mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
476 477 478 479

  return 0;
}

S
Shengliang Guan 已提交
480 481
static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
  SCreateMnodeInMsg *pMsg = pRpcMsg->pCont;
S
Shengliang Guan 已提交
482 483 484 485 486 487 488 489 490 491
  pMsg->dnodeId = htonl(pMsg->dnodeId);
  for (int32_t i = 0; i < pMsg->replica; ++i) {
    pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
    pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
  }

  return pMsg;
}

static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
492
  SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
493 494 495 496 497

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
S
Shengliang Guan 已提交
498 499
    SMnodeOpt option = {0};
    if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
S
Shengliang Guan 已提交
500 501
      return -1;
    }
502

S
Shengliang Guan 已提交
503 504 505 506 507
    return dndOpenMnode(pDnode, &option);
  }
}

static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
508
  SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
S
Shengliang Guan 已提交
509 510 511 512 513

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  }
S
Shengliang Guan 已提交
514 515 516 517 518 519 520 521 522 523 524

  SMnodeOpt option = {0};
  if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
    return -1;
  }

  if (dndAlterMnode(pDnode, &option) != 0) {
    return -1;
  }

  return dndWriteMnodeFile(pDnode);
S
Shengliang Guan 已提交
525 526 527
}

static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
528 529
  SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
  pMsg->dnodeId = htonl(pMsg->dnodeId);
S
Shengliang Guan 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542

  if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
    return -1;
  } else {
    return dndDropMnode(pDnode);
  }
}

static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
  int32_t code = 0;

  switch (pMsg->msgType) {
H
Hongze Cheng 已提交
543
    case TDMT_DND_CREATE_MNODE:
S
Shengliang Guan 已提交
544 545
      code = dndProcessCreateMnodeReq(pDnode, pMsg);
      break;
H
Hongze Cheng 已提交
546
    case TDMT_DND_ALTER_MNODE:
S
Shengliang Guan 已提交
547 548
      code = dndProcessAlterMnodeReq(pDnode, pMsg);
      break;
H
Hongze Cheng 已提交
549
    case TDMT_DND_DROP_MNODE:
S
Shengliang Guan 已提交
550 551 552
      code = dndProcessDropMnodeReq(pDnode, pMsg);
      break;
    default:
S
Shengliang Guan 已提交
553 554
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      code = -1;
S
Shengliang Guan 已提交
555 556 557
      break;
  }

S
Shengliang Guan 已提交
558
  if (pMsg->msgType & 1u) {
S
Shengliang Guan 已提交
559
    if (code != 0) code = terrno;
S
Shengliang Guan 已提交
560 561 562
    SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
    rpcSendResponse(&rsp);
  }
S
Shengliang Guan 已提交
563 564 565 566 567
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
}

static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
568
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
569 570 571

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
572
    mndProcessReadMsg(pMsg);
S
Shengliang Guan 已提交
573 574
    dndReleaseMnode(pDnode, pMnode);
  } else {
575
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
576 577
  }

578
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
579 580 581
}

static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
582
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
583 584 585

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
586
    mndProcessWriteMsg(pMsg);
S
Shengliang Guan 已提交
587 588
    dndReleaseMnode(pDnode, pMnode);
  } else {
589
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
590 591
  }

592
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
593 594 595
}

static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
596
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
597 598 599

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode != NULL) {
600
    mndProcessSyncMsg(pMsg);
S
Shengliang Guan 已提交
601 602
    dndReleaseMnode(pDnode, pMnode);
  } else {
603
    mndSendRsp(pMsg, terrno);
S
Shengliang Guan 已提交
604 605
  }

606
  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
607 608 609
}

static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
610
  SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
S
Shengliang Guan 已提交
611 612 613 614 615 616
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (taosWriteQitem(pQueue, pMsg) != 0) {
617
    mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
618 619 620 621 622 623 624 625
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
626
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
627 628 629
  SMnode     *pMnode = dndAcquireMnode(pDnode);

  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
S
Shengliang Guan 已提交
630 631
  if (pMsg != NULL) *pMsg = *pRpcMsg;

S
Shengliang Guan 已提交
632
  if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
S
Shengliang Guan 已提交
633 634 635 636
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
637 638 639
    rpcFreeCont(pRpcMsg->pCont);
    taosFreeQitem(pMsg);
  }
S
Shengliang Guan 已提交
640 641

  dndReleaseMnode(pDnode, pMnode);
S
Shengliang Guan 已提交
642 643 644
}

void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
645
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
646 647
  SMnode     *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
S
Shengliang Guan 已提交
648 649 650 651
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
652
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
653
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
654 655 656 657 658 659
  }

  dndReleaseMnode(pDnode, pMnode);
}

void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
660
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
661 662
  SMnode     *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
S
Shengliang Guan 已提交
663 664 665 666
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
667
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
668
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
669 670 671 672 673 674
  }

  dndReleaseMnode(pDnode, pMnode);
}

void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
675
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
676
  SMnode     *pMnode = dndAcquireMnode(pDnode);
S
Shengliang Guan 已提交
677
  if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
S
Shengliang Guan 已提交
678 679 680 681
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
682
    rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
683
    pMsg->pCont = NULL;
S
Shengliang Guan 已提交
684 685 686 687 688 689
  }

  dndReleaseMnode(pDnode, pMnode);
}

static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
690
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
691
  pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
S
Shengliang Guan 已提交
692 693 694 695 696 697 698 699
  if (pMgmt->pMgmtQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

static void dndFreeMnodeMgmtQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
700
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
701 702 703 704 705
  tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
  pMgmt->pMgmtQ = NULL;
}

static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
706
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
707 708 709 710 711
  SWorkerPool *pPool = &pMgmt->mgmtPool;
  pPool->name = "mnode-mgmt";
  pPool->min = 1;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
712
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
713 714 715
    return -1;
  }

716
  dDebug("mnode mgmt worker is initialized");
S
Shengliang Guan 已提交
717 718 719 720
  return 0;
}

static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
721
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
722
  tWorkerCleanup(&pMgmt->mgmtPool);
723
  dDebug("mnode mgmt worker is closed");
S
Shengliang Guan 已提交
724 725 726
}

static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
727
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
728
  pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue);
S
Shengliang Guan 已提交
729 730 731 732
  if (pMgmt->pReadQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
733

S
Shengliang Guan 已提交
734 735 736 737
  return 0;
}

static void dndFreeMnodeReadQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
738
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
739 740 741 742 743
  tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ);
  pMgmt->pReadQ = NULL;
}

static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
744
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
745 746 747 748 749
  SWorkerPool *pPool = &pMgmt->readPool;
  pPool->name = "mnode-read";
  pPool->min = 0;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
750
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
751 752 753
    return -1;
  }

754
  dDebug("mnode read worker is initialized");
S
Shengliang Guan 已提交
755 756 757 758
  return 0;
}

static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
759
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
760
  tWorkerCleanup(&pMgmt->readPool);
761
  dDebug("mnode read worker is closed");
S
Shengliang Guan 已提交
762 763 764
}

static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
765
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
766
  pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue);
S
Shengliang Guan 已提交
767 768 769 770
  if (pMgmt->pWriteQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
771

S
Shengliang Guan 已提交
772 773 774 775
  return 0;
}

static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
776
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
777 778 779 780 781
  tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ);
  pMgmt->pWriteQ = NULL;
}

static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
782
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
783 784 785 786 787
  SWorkerPool *pPool = &pMgmt->writePool;
  pPool->name = "mnode-write";
  pPool->min = 0;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
S
Shengliang Guan 已提交
788
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
789 790 791
    return -1;
  }

792
  dDebug("mnode write worker is initialized");
S
Shengliang Guan 已提交
793 794 795 796
  return 0;
}

static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
797
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
798
  tWorkerCleanup(&pMgmt->writePool);
799
  dDebug("mnode write worker is closed");
S
Shengliang Guan 已提交
800 801 802
}

static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
803
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
804
  pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue);
S
Shengliang Guan 已提交
805 806 807 808
  if (pMgmt->pSyncQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
809

S
Shengliang Guan 已提交
810 811 812 813
  return 0;
}

static void dndFreeMnodeSyncQueue(SDnode *pDnode) {
S
Shengliang Guan 已提交
814
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
815 816 817 818 819
  tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ);
  pMgmt->pSyncQ = NULL;
}

static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
820
  SMnodeMgmt  *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
821 822 823 824
  SWorkerPool *pPool = &pMgmt->syncPool;
  pPool->name = "mnode-sync";
  pPool->min = 0;
  pPool->max = 1;
S
Shengliang Guan 已提交
825 826 827 828 829
  if (tWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

830
  dDebug("mnode sync worker is initialized");
S
Shengliang Guan 已提交
831
  return 0;
S
Shengliang Guan 已提交
832 833 834
}

static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
S
Shengliang Guan 已提交
835
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
836
  tWorkerCleanup(&pMgmt->syncPool);
837
  dDebug("mnode sync worker is closed");
S
Shengliang Guan 已提交
838 839 840 841
}

int32_t dndInitMnode(SDnode *pDnode) {
  dInfo("dnode-mnode start to init");
S
Shengliang Guan 已提交
842
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
843 844 845 846 847 848 849
  taosInitRWLatch(&pMgmt->latch);

  if (dndInitMnodeMgmtWorker(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
850 851 852 853 854
  if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868
  char path[PATH_MAX];
  snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
  pMgmt->file = strdup(path);
  if (pMgmt->file == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (dndReadMnodeFile(pDnode) != 0) {
    return -1;
  }

  if (pMgmt->dropped) {
    dInfo("mnode has been deployed and needs to be deleted");
869
    mndDestroy(pDnode->dir.mnode);
S
Shengliang Guan 已提交
870 871 872 873 874 875 876 877 878 879 880
    return 0;
  }

  if (!pMgmt->deployed) {
    bool needDeploy = dndNeedDeployMnode(pDnode);
    if (!needDeploy) {
      dDebug("mnode does not need to be deployed");
      return 0;
    }

    dInfo("start to deploy mnode");
S
Shengliang Guan 已提交
881 882 883
    SMnodeOpt option = {0};
    dndBuildMnodeDeployOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
884 885
  } else {
    dInfo("start to open mnode");
S
Shengliang Guan 已提交
886 887 888
    SMnodeOpt option = {0};
    dndBuildMnodeOpenOption(pDnode, &option);
    return dndOpenMnode(pDnode, &option);
S
Shengliang Guan 已提交
889 890 891 892
  }
}

void dndCleanupMnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
893
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
894 895

  dInfo("dnode-mnode start to clean up");
S
Shengliang Guan 已提交
896
  if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
S
Shengliang Guan 已提交
897
  dndCleanupMnodeMgmtWorker(pDnode);
S
Shengliang Guan 已提交
898
  dndFreeMnodeMgmtQueue(pDnode);
S
Shengliang Guan 已提交
899
  tfree(pMgmt->file);
900
  mndClose(pMgmt->pMnode);
S
Shengliang Guan 已提交
901 902 903 904
  dInfo("dnode-mnode is cleaned up");
}

int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
S
Shengliang Guan 已提交
905
  SMnodeMgmt *pMgmt = &pDnode->mmgmt;
S
Shengliang Guan 已提交
906 907 908 909 910 911 912 913

  SMnode *pMnode = dndAcquireMnode(pDnode);
  if (pMnode == NULL) {
    terrno = TSDB_CODE_APP_NOT_READY;
    dTrace("failed to get user auth since %s", terrstr());
    return -1;
  }

914
  int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
S
Shengliang Guan 已提交
915 916 917
  dndReleaseMnode(pDnode, pMnode);
  return code;
}