mndMain.c 29.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndAcct.h"
S
Shengliang Guan 已提交
18
#include "mndBnode.h"
S
Shengliang Guan 已提交
19
#include "mndCluster.h"
L
Liu Jicong 已提交
20
#include "mndConsumer.h"
S
Shengliang Guan 已提交
21 22 23
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
24
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
L
Liu Jicong 已提交
28
#include "mndPerfSchema.h"
M
Minghao Li 已提交
29
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
90 91
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
92
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
93 94 95 96
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
97 98
}

99
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
100
  int32_t contLen = 0;
M
Minghao Li 已提交
101
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
102
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
103 104 105
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
106 107
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
108
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
109 110 111 112
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
113
}
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
117
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
118 119 120 121
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
122 123
}

124
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
125 126 127 128 129 130 131 132 133
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

134 135 136 137 138 139 140 141 142 143
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
144
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
145
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
146 147 148 149 150 151
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
152
    if (mndGetStop(pMnode)) break;
153
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
154

155 156
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
157
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
158 159
    }

160
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
161 162 163
      mndPullupTrans(pMnode);
    }

164
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
165 166 167
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
168
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
169 170
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
171

172
    if (sec % tsGrantHBInterval == 0) {
173
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
174
    }
175

176
    if (sec % tsUptimeInterval == 0) {
177 178
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
179 180
  }

S
Shengliang Guan 已提交
181
  return NULL;
L
Liu Jicong 已提交
182 183
}

184
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
185 186 187 188 189
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
190 191
    return -1;
  }
L
Liu Jicong 已提交
192

S
Shengliang Guan 已提交
193 194
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
195 196 197
  return 0;
}

198
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
199 200
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
201
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
202 203 204
  }
}

S
Shengliang Guan 已提交
205
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
206 207 208
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
209
    return -1;
210 211 212 213
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
214
    return -1;
215
  }
216 217

  return 0;
218
}
S
Shengliang Guan 已提交
219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
235
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
236 237 238 239 240 241 242 243 244 245 246 247 248
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

249 250 251
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
252
  opt.pMnode = pMnode;
253
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255
  pMnode->pSdb = sdbInit(&opt);
256
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
257 258 259 260 261 262
    return -1;
  }

  return 0;
}

263 264 265 266 267 268 269
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
270 271 272

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
273
    sdbCleanup(pMnode->pSdb);
274 275 276 277
    pMnode->pSdb = NULL;
  }
}

278 279 280 281 282
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
283
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
284 285 286 287
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
288 289 290
  return 0;
}

291
static int32_t mndInitSteps(SMnode *pMnode) {
292
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
293 294 295 296
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
297
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
298 299
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
300
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
301
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
302
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
303
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
304
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
305
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
306
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
307 308
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
309
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
310
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
311
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
312
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
313
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
314
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
315
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
316
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
317
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
318 319
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
320
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
321 322
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
323 324 325 326

  return 0;
}

327
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
328 329
  if (pMnode->pSteps == NULL) return;

330
  if (pos == -1) {
331
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
332 333
  }

334
  for (int32_t s = pos; s >= 0; s--) {
335
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
336
    mInfo("%s will cleanup", pStep->name);
337 338 339
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
340 341
  }

S
Shengliang Guan 已提交
342
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
343
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
344
  pMnode->pSteps = NULL;
345
}
S
Shengliang Guan 已提交
346

347
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
348
  int32_t size = taosArrayGetSize(pMnode->pSteps);
349
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
350
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
351
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
352

S
Shengliang Guan 已提交
353
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
354
      int32_t code = terrno;
S
Shengliang Guan 已提交
355
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
356
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
357
      terrno = code;
S
Shengliang Guan 已提交
358
      return -1;
S
Shengliang Guan 已提交
359
    } else {
360
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
361
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
362 363
    }
  }
S
Shengliang Guan 已提交
364

S
shm  
Shengliang Guan 已提交
365
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
366
  return 0;
367
}
S
Shengliang Guan 已提交
368

S
shm  
Shengliang Guan 已提交
369
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
370
  pMnode->msgCb = pOption->msgCb;
371
  pMnode->selfDnodeId = pOption->dnodeId;
372 373 374
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
375
}
376

377
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
378
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
379

wafwerar's avatar
wafwerar 已提交
380
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
381 382 383 384 385 386
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
387 388
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
389
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
390

391
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
392 393
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
394
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
395 396 397 398
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
399

S
Shengliang Guan 已提交
400
  int32_t code = mndCreateDir(pMnode, path);
401
  if (code != 0) {
S
Shengliang Guan 已提交
402 403
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
404 405 406 407 408
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

409
  code = mndInitSteps(pMnode);
410
  if (code != 0) {
S
Shengliang Guan 已提交
411 412
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
413 414 415 416 417 418 419
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
420 421
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
422 423 424 425
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
426

427
  mInfo("mnode open successfully ");
S
Shengliang Guan 已提交
428 429
  return pMnode;
}
S
Shengliang Guan 已提交
430

431 432
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
433
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
434
    syncLeaderTransfer(pMnode->syncMgmt.sync);
435

436 437 438 439 440 441 442 443 444
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
445 446 447
  }
}

448
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
449
  if (pMnode != NULL) {
450
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
451
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
452 453
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
454
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
455
  }
456
}
S
Shengliang Guan 已提交
457

458
int32_t mndStart(SMnode *pMnode) {
459
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
460
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
461 462 463 464
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
465
    mndSetRestored(pMnode, true);
466
  }
M
Minghao Li 已提交
467

C
Cary Xu 已提交
468
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
469

M
Minghao Li 已提交
470 471 472
  return mndInitTimer(pMnode);
}

473
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
474
  mndSetStop(pMnode);
475
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
476
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
477 478
}

M
Minghao Li 已提交
479
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
480
  SMnode    *pMnode = pMsg->info.node;
481
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
482
  int32_t    code = 0;
M
Minghao Li 已提交
483

484 485
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
486 487
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
488
  }
M
Minghao Li 已提交
489

490 491 492
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
493 494
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
495
  }
496

497
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
498
  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
499
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
500 501 502
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
503
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
504 505 506
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
507
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
508 509 510
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
511
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
512
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
513
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
514
      syncClientRequestDestroy(pSyncMsg);
515
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
516 517 518
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
519
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
520 521 522
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
523
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
524 525 526
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
527
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
528 529 530
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
531
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
532 533 534
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
535
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
536 537 538
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
539
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
540 541 542
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
543 544
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
545
      code = -1;
M
Minghao Li 已提交
546
    }
M
Minghao Li 已提交
547
  } else {
548
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
549 550 551
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
552
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
553 554 555
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
556
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
557 558 559
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
560
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
561
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
562
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
563
      syncClientRequestDestroy(pSyncMsg);
564
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
565 566 567
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
568
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
569 570 571
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
572
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
573 574 575
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
576
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
577 578 579
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
580
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
581 582 583
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
584 585
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
586
      code = -1;
M
Minghao Li 已提交
587
    }
M
Minghao Li 已提交
588 589
  }

M
Minghao Li 已提交
590 591
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
592 593 594
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
595
  return code;
M
Minghao Li 已提交
596 597
}

S
Shengliang Guan 已提交
598
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
599
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
600 601
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
602
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
603 604
    return 0;
  }
605
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
606 607 608 609

  SMnode     *pMnode = pMsg->info.node;
  const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
  bool        restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
610
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
611 612
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
613 614
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
           pMnode->stopped, restored, role);
S
Shengliang Guan 已提交
615 616
    return -1;
  }
S
Shengliang Guan 已提交
617

618 619
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
620
  mndGetMnodeEpSet(pMnode, &epSet);
621

622
  mDebug(
623 624 625 626
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps,
      epSet.inUse);
S
Shengliang Guan 已提交
627

S
Shengliang Guan 已提交
628 629
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
630
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
631 632 633 634
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
635
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
636 637 638 639 640 641 642
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
643
  } else {
S
Shengliang Guan 已提交
644
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
645
  }
S
Shengliang Guan 已提交
646 647

  return -1;
648 649
}

S
Shengliang Guan 已提交
650
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
651 652
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
653

S
Shengliang Guan 已提交
654 655
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
656
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
657 658 659 660
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
661
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
662
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
663 664
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
665
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
666
  if (fp == NULL) {
S
Shengliang Guan 已提交
667
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
668 669
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
670 671
  }

S
Shengliang Guan 已提交
672 673 674
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
675
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
676
  int32_t code = (*fp)(pMsg);
677
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
678

S
Shengliang Guan 已提交
679
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
680
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
681
  } else if (code == 0) {
S
Shengliang Guan 已提交
682
    mGTrace("msg:%p, successfully processed", pMsg);
683
  } else {
S
Shengliang Guan 已提交
684 685
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
686
  }
S
Shengliang Guan 已提交
687

S
shm  
Shengliang Guan 已提交
688
  return code;
S
Shengliang Guan 已提交
689 690
}

691 692
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
693
  if (type < TDMT_MAX) {
694
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
695 696 697
  }
}

D
dapan1121 已提交
698
// Note: uid 0 is reserved
699
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
700
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
701
  do {
L
Liu Jicong 已提交
702
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
703 704
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
705
    if (uuid) {
L
Liu Jicong 已提交
706
      return llabs(uuid);
D
dapan1121 已提交
707 708
    }
  } while (true);
L
Liu Jicong 已提交
709
}
S
Shengliang Guan 已提交
710 711

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
712
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
713
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
714

M
Minghao Li 已提交
715
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
716 717 718 719 720
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
721 722 723
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
724
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
725 726 727 728
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
729
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
730 731
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
732 733
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
734 735 736 737 738 739 740 741 742 743

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
744
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

763
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
764 765
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
766 767
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
768 769 770
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
771
    }
772 773
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
774 775 776 777 778 779 780 781 782 783
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
784
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
785 786 787

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
788 789 790 791 792

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
793 794 795 796
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
797
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
798 799
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
800
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
801 802 803 804
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
805
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
806 807 808 809 810 811 812 813 814
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
815 816 817 818 819 820 821
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
822
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
823 824 825 826 827 828 829 830 831 832 833 834 835

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
836 837 838 839 840 841 842 843
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

844
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
845
  return 0;
L
Liu Jicong 已提交
846
}
S
Shengliang Guan 已提交
847 848

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
849
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
850
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
851
  return 0;
L
fix  
Liu Jicong 已提交
852
}
S
Shengliang Guan 已提交
853

854
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }