mndMain.c 27.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
88 89
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
90
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91 92 93 94
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
95 96
}

97
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
98
  int32_t contLen = 0;
M
Minghao Li 已提交
99
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
100
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
101 102 103
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
104 105
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
106
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
107 108 109 110
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
111
}
L
Liu Jicong 已提交
112

S
Shengliang Guan 已提交
113 114
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
115
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
116 117 118 119
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
120 121
}

122
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
123 124 125 126 127 128 129 130 131
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

132 133 134 135 136 137 138 139 140 141
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
142
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
143
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
144 145 146 147 148 149
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
150
    if (mndGetStop(pMnode)) break;
151
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
152

153 154
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
155
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
156 157
    }

158
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
159 160 161
      mndPullupTrans(pMnode);
    }

162
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
163 164 165
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
166
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
167 168
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
169

170
    if (sec % tsGrantHBInterval == 0) {
171
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
172
    }
173

174
    if (sec % tsUptimeInterval == 0) {
175 176
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
177 178
  }

S
Shengliang Guan 已提交
179
  return NULL;
L
Liu Jicong 已提交
180 181
}

182
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
183 184 185 186 187
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
188 189
    return -1;
  }
L
Liu Jicong 已提交
190

S
Shengliang Guan 已提交
191 192
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
193 194 195
  return 0;
}

196
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
197 198
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
199
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
200 201 202
  }
}

S
Shengliang Guan 已提交
203
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
204 205 206
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
207
    return -1;
208 209 210 211
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
212
    return -1;
213
  }
214 215

  return 0;
216
}
S
Shengliang Guan 已提交
217

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
233
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
234 235 236 237 238 239 240 241 242 243 244 245 246
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

247 248 249
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
250
  opt.pMnode = pMnode;
251
  opt.pWal = pMnode->pWal;
252
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254
  pMnode->pSdb = sdbInit(&opt);
255
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
256 257 258 259 260 261
    return -1;
  }

  return 0;
}

262 263 264 265 266 267 268
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
269 270 271

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
272
    sdbCleanup(pMnode->pSdb);
273 274 275 276
    pMnode->pSdb = NULL;
  }
}

277 278 279 280 281
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
282
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
283 284 285 286
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
287 288 289
  return 0;
}

290
static int32_t mndInitSteps(SMnode *pMnode) {
291
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
292 293 294 295
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
296
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
297
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
298
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
299
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
300
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
301
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
302
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
303
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
304
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
305 306
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
307
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
308
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
309
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
310
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
311
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
312
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
313
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
314
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
315 316
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
317
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
318 319
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
320 321 322 323

  return 0;
}

324
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
325 326
  if (pMnode->pSteps == NULL) return;

327
  if (pos == -1) {
328
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
329 330
  }

331
  for (int32_t s = pos; s >= 0; s--) {
332
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
333
    mInfo("%s will cleanup", pStep->name);
334 335 336
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
337 338
  }

S
Shengliang Guan 已提交
339
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
340
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
341
  pMnode->pSteps = NULL;
342
}
S
Shengliang Guan 已提交
343

344
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
345
  int32_t size = taosArrayGetSize(pMnode->pSteps);
346
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
347
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
348
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
349

S
Shengliang Guan 已提交
350
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
351
      int32_t code = terrno;
S
Shengliang Guan 已提交
352
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
353
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
354
      terrno = code;
S
Shengliang Guan 已提交
355
      return -1;
S
Shengliang Guan 已提交
356
    } else {
357
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
358
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
359 360
    }
  }
S
Shengliang Guan 已提交
361

S
shm  
Shengliang Guan 已提交
362
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
363
  return 0;
364
}
S
Shengliang Guan 已提交
365

S
shm  
Shengliang Guan 已提交
366
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
367
  pMnode->msgCb = pOption->msgCb;
368
  pMnode->selfDnodeId = pOption->dnodeId;
369 370 371
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
372
}
373

374
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
375
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
376

wafwerar's avatar
wafwerar 已提交
377
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
378 379 380 381 382
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
383
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
384

S
Shengliang Guan 已提交
385 386
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
387
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
388

389
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
390 391
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
392
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
393 394 395 396
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
397

S
Shengliang Guan 已提交
398
  int32_t code = mndCreateDir(pMnode, path);
399
  if (code != 0) {
S
Shengliang Guan 已提交
400 401
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
402 403 404 405 406
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

407
  code = mndInitSteps(pMnode);
408
  if (code != 0) {
S
Shengliang Guan 已提交
409 410
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
411 412 413 414 415 416 417
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
418 419
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
420 421 422 423
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
424

S
Shengliang Guan 已提交
425
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
426 427
  return pMnode;
}
S
Shengliang Guan 已提交
428

429 430
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
431
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
432
    syncLeaderTransfer(pMnode->syncMgmt.sync);
433

434 435 436 437 438 439 440 441 442
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
443 444 445
  }
}

446
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
447
  if (pMnode != NULL) {
448
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
449
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
450 451
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
452
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
453
  }
454
}
S
Shengliang Guan 已提交
455

456
int32_t mndStart(SMnode *pMnode) {
457
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
458
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
459 460 461 462
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
463
    mndSetRestored(pMnode, true);
464
  }
M
Minghao Li 已提交
465

C
Cary Xu 已提交
466
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
467

M
Minghao Li 已提交
468 469 470
  return mndInitTimer(pMnode);
}

471
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
472
  mndSetStop(pMnode);
473
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
474
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
475 476
}

477 478 479 480 481 482 483
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = 0;

  mInfo("vgId:%d, process sync ctrl msg", 1);

S
Shengliang Guan 已提交
484
  if (!syncIsInit()) {
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  }

  syncNodeRelease(pSyncNode);

  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return code;
}

M
Minghao Li 已提交
516
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
517
  SMnode    *pMnode = pMsg->info.node;
518
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
519
  int32_t    code = 0;
M
Minghao Li 已提交
520

S
Shengliang Guan 已提交
521
  if (!syncIsInit()) {
522
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
523 524
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
525
  }
M
Minghao Li 已提交
526

527 528 529
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
530 531
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
532
  }
533

534 535
  if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
536
    code = syncNodeOnTimer(pSyncNode, pSyncMsg);
537 538 539 540
    syncTimeoutDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
M
Minghao Li 已提交
541
    code = syncNodeOnPing(pSyncNode, pSyncMsg);
542 543 544 545
    syncPingDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
M
Minghao Li 已提交
546
    code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
    syncPingReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
    syncClientRequestDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);

M
Minghao Li 已提交
574 575 576 577 578 579 580 581 582 583
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);

M
Minghao Li 已提交
584 585 586 587 588
  } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
    SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
    code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
    syncLocalCmdDestroy(pSyncMsg);

M
Minghao Li 已提交
589
  } else {
590 591
    mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
592 593
  }

M
Minghao Li 已提交
594 595
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
596 597 598
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
599
  return code;
M
Minghao Li 已提交
600 601
}

S
Shengliang Guan 已提交
602
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
603
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
604 605
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
606
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
607 608
    return 0;
  }
609
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
610 611 612 613

  SMnode     *pMnode = pMsg->info.node;
  const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
  bool        restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
614
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
615 616
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
617 618
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
           pMnode->stopped, restored, role);
S
Shengliang Guan 已提交
619 620
    return -1;
  }
S
Shengliang Guan 已提交
621

622 623
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
624
  mndGetMnodeEpSet(pMnode, &epSet);
625

626
  mDebug(
627 628 629 630
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps,
      epSet.inUse);
S
Shengliang Guan 已提交
631

S
Shengliang Guan 已提交
632 633
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
634
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
635 636 637 638
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
639
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
640 641 642 643 644 645 646
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
647
  } else {
S
Shengliang Guan 已提交
648
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
649
  }
S
Shengliang Guan 已提交
650 651

  return -1;
652 653
}

S
Shengliang Guan 已提交
654
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
655 656
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
657

S
Shengliang Guan 已提交
658 659
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
660
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
661 662 663 664
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
665
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
666
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
667 668
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
669
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
670
  if (fp == NULL) {
S
Shengliang Guan 已提交
671
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
672 673
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
674 675
  }

S
Shengliang Guan 已提交
676 677 678
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
679
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
680
  int32_t code = (*fp)(pMsg);
681
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
682

S
Shengliang Guan 已提交
683
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
684
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
685
  } else if (code == 0) {
S
Shengliang Guan 已提交
686
    mGTrace("msg:%p, successfully processed", pMsg);
687
  } else {
S
Shengliang Guan 已提交
688 689
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
690
  }
S
Shengliang Guan 已提交
691

S
shm  
Shengliang Guan 已提交
692
  return code;
S
Shengliang Guan 已提交
693 694
}

695 696
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
697
  if (type < TDMT_MAX) {
698
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
699 700 701
  }
}

D
dapan1121 已提交
702
// Note: uid 0 is reserved
703
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
704
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
705
  do {
L
Liu Jicong 已提交
706
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
707 708
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
709
    if (uuid) {
L
Liu Jicong 已提交
710
      return llabs(uuid);
D
dapan1121 已提交
711 712
    }
  } while (true);
L
Liu Jicong 已提交
713
}
S
Shengliang Guan 已提交
714 715

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
716
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
717
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
718

M
Minghao Li 已提交
719
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
720 721 722 723 724
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
725 726 727
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
728
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
729 730 731 732
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
733
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
734 735
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
736 737
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
738 739 740 741 742 743 744 745 746 747

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
748
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

767
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
768 769
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
770 771
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
772 773
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
774
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
775
    }
776 777
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
778 779 780 781 782 783 784 785 786 787
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
788
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
789 790 791

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
792 793 794 795 796

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
797 798 799 800
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
801
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
802 803
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
804 805
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
806 807 808
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
809
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
810 811 812 813 814 815 816 817 818
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
819 820 821 822 823 824 825
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
826
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
840 841 842 843 844 845 846 847
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

848
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
849
  return 0;
L
Liu Jicong 已提交
850
}
S
Shengliang Guan 已提交
851 852

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
853
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
854 855
  pLoad->syncRestore = pMnode->restored;
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
856
  return 0;
L
fix  
Liu Jicong 已提交
857
}
S
Shengliang Guan 已提交
858

859
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }