mndMain.c 28.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndAcct.h"
S
Shengliang Guan 已提交
18
#include "mndBnode.h"
S
Shengliang Guan 已提交
19
#include "mndCluster.h"
L
Liu Jicong 已提交
20
#include "mndConsumer.h"
S
Shengliang Guan 已提交
21 22 23
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
24
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
L
Liu Jicong 已提交
28
#include "mndPerfSchema.h"
M
Minghao Li 已提交
29
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
90 91
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
92
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
93 94 95 96
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
97 98
}

99
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
100
  int32_t contLen = 0;
M
Minghao Li 已提交
101
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
102
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
103 104 105
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
106 107
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
108
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
109 110 111 112
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
113
}
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
117
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
118 119 120 121
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
122 123
}

124
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
125 126 127 128 129 130 131 132 133
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

134 135 136 137 138 139 140 141 142 143
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
144
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
145
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
146 147 148 149 150 151
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
152
    if (mndGetStop(pMnode)) break;
153
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
154

155 156
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
157
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
158 159
    }

160
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
161 162 163
      mndPullupTrans(pMnode);
    }

164
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
165 166 167
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
168
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
169 170
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
171

172
    if (sec % tsGrantHBInterval == 0) {
173
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
174
    }
175

176
    if (sec % tsUptimeInterval == 0) {
177 178
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
179 180
  }

S
Shengliang Guan 已提交
181
  return NULL;
L
Liu Jicong 已提交
182 183
}

184
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
185 186 187 188 189
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
190 191
    return -1;
  }
L
Liu Jicong 已提交
192

S
Shengliang Guan 已提交
193 194
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
195 196 197
  return 0;
}

198
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
199 200
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
201
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
202 203 204
  }
}

S
Shengliang Guan 已提交
205
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
206 207 208
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
209
    return -1;
210 211 212 213
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
214
    return -1;
215
  }
216 217

  return 0;
218
}
S
Shengliang Guan 已提交
219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

249 250 251
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
252
  opt.pMnode = pMnode;
253
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255
  pMnode->pSdb = sdbInit(&opt);
256
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
257 258 259 260 261 262
    return -1;
  }

  return 0;
}

263 264 265 266 267 268 269
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
270 271 272

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
273
    sdbCleanup(pMnode->pSdb);
274 275 276 277
    pMnode->pSdb = NULL;
  }
}

278 279 280 281 282
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
283
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
284 285 286 287
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
288 289 290
  return 0;
}

291
static int32_t mndInitSteps(SMnode *pMnode) {
292
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
293 294 295 296
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
297
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
298 299
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
300
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
301
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
302
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
303
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
304
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
305
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
306
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
307 308
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
309
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
310
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
311
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
312
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
313
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
314
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
315
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
316
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
317
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
318 319
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
320
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
321 322
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
323 324 325 326

  return 0;
}

327
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
328 329
  if (pMnode->pSteps == NULL) return;

330
  if (pos == -1) {
331
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
332 333
  }

334
  for (int32_t s = pos; s >= 0; s--) {
335
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
336
    mInfo("%s will cleanup", pStep->name);
337 338 339
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
340 341
  }

S
Shengliang Guan 已提交
342
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
343
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
344
  pMnode->pSteps = NULL;
345
}
S
Shengliang Guan 已提交
346

347
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
348
  int32_t size = taosArrayGetSize(pMnode->pSteps);
349
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
350
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
351
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
352

S
Shengliang Guan 已提交
353
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
354
      int32_t code = terrno;
S
Shengliang Guan 已提交
355
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
356
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
357
      terrno = code;
S
Shengliang Guan 已提交
358
      return -1;
S
Shengliang Guan 已提交
359
    } else {
360
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
361
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
362 363
    }
  }
S
Shengliang Guan 已提交
364

S
shm  
Shengliang Guan 已提交
365
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
366
  return 0;
367
}
S
Shengliang Guan 已提交
368

S
shm  
Shengliang Guan 已提交
369
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
370
  pMnode->msgCb = pOption->msgCb;
371
  pMnode->selfDnodeId = pOption->dnodeId;
372 373 374
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
375
}
376

377
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
378
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
379

wafwerar's avatar
wafwerar 已提交
380
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
381 382 383 384 385 386
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
387 388
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
389
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
390

391
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
392 393
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
394
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
395 396 397 398
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
399

S
Shengliang Guan 已提交
400
  int32_t code = mndCreateDir(pMnode, path);
401
  if (code != 0) {
S
Shengliang Guan 已提交
402 403
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
404 405 406 407 408
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

409
  code = mndInitSteps(pMnode);
410
  if (code != 0) {
S
Shengliang Guan 已提交
411 412
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
413 414 415 416 417 418 419
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
420 421
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
422 423 424 425
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
426

427
  mInfo("mnode open successfully ");
S
Shengliang Guan 已提交
428 429
  return pMnode;
}
S
Shengliang Guan 已提交
430

431 432
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
433
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
434
    syncLeaderTransfer(pMnode->syncMgmt.sync);
435

436 437 438 439 440 441 442 443 444
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
445 446 447
  }
}

448
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
449
  if (pMnode != NULL) {
450
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
451
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
452 453
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
454
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
455
  }
456
}
S
Shengliang Guan 已提交
457

458
int32_t mndStart(SMnode *pMnode) {
459
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
460
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
461 462 463 464
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
465
    mndSetRestored(pMnode, true);
466
  }
M
Minghao Li 已提交
467

C
Cary Xu 已提交
468
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
469

M
Minghao Li 已提交
470 471 472
  return mndInitTimer(pMnode);
}

473
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
474
  mndSetStop(pMnode);
475
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
476
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
477 478
}

M
Minghao Li 已提交
479
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
480
  SMnode    *pMnode = pMsg->info.node;
481
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
482
  int32_t    code = 0;
M
Minghao Li 已提交
483

484 485
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
486 487
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
488
  }
M
Minghao Li 已提交
489

490 491 492
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
493 494
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
495
  }
496

497
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
498
  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
499
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
500 501 502
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
503
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
504 505 506
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
507
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
508 509 510
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
511
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
512
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
513
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
514
      syncClientRequestDestroy(pSyncMsg);
515
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
516 517 518
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
519
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
520 521 522
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
523
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
524 525 526
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
527
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
528 529 530
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
531
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
532 533 534
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
535
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
536 537 538
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
539
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
540 541 542
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
543 544
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
545
      code = -1;
M
Minghao Li 已提交
546
    }
M
Minghao Li 已提交
547
  } else {
548
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
549 550 551
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
552
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
553 554 555
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
556
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
557 558 559
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
560
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
561
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
562
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
563
      syncClientRequestDestroy(pSyncMsg);
564
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
565 566 567
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
568
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
569 570 571
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
572
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
573 574 575
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
576
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
577 578 579
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
580
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
581 582 583
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
584 585
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
586
      code = -1;
M
Minghao Li 已提交
587
    }
M
Minghao Li 已提交
588 589
  }

M
Minghao Li 已提交
590 591
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
592 593 594
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
595
  return code;
M
Minghao Li 已提交
596 597
}

S
Shengliang Guan 已提交
598
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
599
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
600 601
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
602
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
603 604
    return 0;
  }
605
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
606
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
607 608
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
S
Shengliang Guan 已提交
609 610
    return -1;
  }
S
Shengliang Guan 已提交
611

612 613 614
  SEpSet  epSet = {0};
  SMnode *pMnode = pMsg->info.node;
  mndGetMnodeEpSet(pMnode, &epSet);
615

S
Shengliang Guan 已提交
616
  const STraceId *trace = &pMsg->info.traceId;
617 618 619 620 621
  mDebug(
      "msg:%p, failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s "
      "numOfEps:%d inUse:%d",
      pMsg, terrstr(), pMnode->restored, pMnode->stopped, syncIsRestoreFinish(pMnode->syncMgmt.sync),
      syncGetMyRoleStr(pMnode->syncMgmt.sync), TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
622

S
Shengliang Guan 已提交
623 624
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
625
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
626 627 628 629
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
630
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
631 632 633 634 635 636 637
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
638
  } else {
S
Shengliang Guan 已提交
639
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
640
  }
S
Shengliang Guan 已提交
641 642

  return -1;
643 644
}

S
Shengliang Guan 已提交
645
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
646 647
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
648

S
Shengliang Guan 已提交
649 650
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
651
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
652 653 654 655
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
656
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
657
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
658 659
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
660
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
661
  if (fp == NULL) {
S
Shengliang Guan 已提交
662
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
663 664
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
665 666
  }

S
Shengliang Guan 已提交
667 668 669
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
670
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
671
  int32_t code = (*fp)(pMsg);
672
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
673

S
Shengliang Guan 已提交
674
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
675
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
676
  } else if (code == 0) {
S
Shengliang Guan 已提交
677
    mGTrace("msg:%p, successfully processed", pMsg);
678
  } else {
S
Shengliang Guan 已提交
679 680
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
681
  }
S
Shengliang Guan 已提交
682

S
shm  
Shengliang Guan 已提交
683
  return code;
S
Shengliang Guan 已提交
684 685
}

686 687
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
688
  if (type < TDMT_MAX) {
689
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
690 691 692
  }
}

D
dapan1121 已提交
693
// Note: uid 0 is reserved
694
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
695
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
696
  do {
L
Liu Jicong 已提交
697
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
698 699
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
700
    if (uuid) {
L
Liu Jicong 已提交
701
      return llabs(uuid);
D
dapan1121 已提交
702 703
    }
  } while (true);
L
Liu Jicong 已提交
704
}
S
Shengliang Guan 已提交
705 706

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
707
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
708
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
709

M
Minghao Li 已提交
710
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
711 712 713 714 715
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
716 717 718
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
719
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
720 721 722 723
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
724
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
725 726
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
727 728
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
729 730 731 732 733 734 735 736 737 738

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
739
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

758
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
759 760
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
761 762
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
763 764 765
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
766
    }
767 768
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
769 770 771 772 773 774 775 776 777 778
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
779
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
780 781 782

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
783 784 785 786 787

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
788 789 790 791
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
792
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
793 794
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
795
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
796 797 798 799
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
800
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
801 802 803 804 805 806 807 808 809
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
810 811 812 813 814 815 816
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
817
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
818 819 820 821 822 823 824 825 826 827 828 829 830

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
831 832 833 834 835 836 837 838
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

839
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
840
  return 0;
L
Liu Jicong 已提交
841
}
S
Shengliang Guan 已提交
842 843

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
844
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
845
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
846
  return 0;
L
fix  
Liu Jicong 已提交
847
}
S
Shengliang Guan 已提交
848

849
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }