mndMain.c 27.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
dengyihao's avatar
dengyihao 已提交
24
#include "mndIndex.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndPerfSchema.h"
M
Minghao Li 已提交
28
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
29
#include "mndProfile.h"
S
Shengliang Guan 已提交
30
#include "mndQnode.h"
L
Liu Jicong 已提交
31
#include "mndQuery.h"
S
Shengliang Guan 已提交
32
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
33
#include "mndSma.h"
S
Shengliang Guan 已提交
34
#include "mndSnode.h"
S
Shengliang Guan 已提交
35
#include "mndStb.h"
L
Liu Jicong 已提交
36
#include "mndStream.h"
L
Liu Jicong 已提交
37
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
38 39
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
40
#include "mndTopic.h"
S
Shengliang Guan 已提交
41 42 43
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
44

45 46 47 48
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
49
    terrno = TSDB_CODE_APP_IS_STOPPING;
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
  SMStreamTickReq timerReq = {
      .tick = sec,
  };

  int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMStreamTickMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
104
static void mndPullupTrans(SMnode *pMnode) {
105
  mTrace("pullup trans msg");
S
Shengliang Guan 已提交
106
  int32_t contLen = 0;
M
Minghao Li 已提交
107
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
108 109 110 111
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
112 113
}

114
static void mndPullupTtl(SMnode *pMnode) {
115
  mTrace("pullup ttl");
wmmhello's avatar
wmmhello 已提交
116
  int32_t contLen = 0;
M
Minghao Li 已提交
117
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
118
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
119 120 121
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
122
static void mndCalMqRebalance(SMnode *pMnode) {
123
  mTrace("calc mq rebalance");
S
Shengliang Guan 已提交
124
  int32_t contLen = 0;
M
Minghao Li 已提交
125
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
126
  if (pReq != NULL) {
127
    SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen };
128 129 130 131
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

5
54liuyao 已提交
132
#if 0
133 134 135 136 137 138 139 140 141
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
  int32_t contLen = 0;
  void   *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
S
Shengliang Guan 已提交
142 143
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
144
}
5
54liuyao 已提交
145
#endif
L
Liu Jicong 已提交
146

S
Shengliang Guan 已提交
147
static void mndPullupTelem(SMnode *pMnode) {
148
  mTrace("pullup telem msg");
S
Shengliang Guan 已提交
149
  int32_t contLen = 0;
M
Minghao Li 已提交
150
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
151 152 153 154
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
155 156
}

157
static void mndPullupGrant(SMnode *pMnode) {
158
  mTrace("pullup grant msg");
C
Cary Xu 已提交
159 160 161 162 163 164 165 166 167
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

168
static void mndIncreaseUpTime(SMnode *pMnode) {
169
  mTrace("increate uptime");
170 171 172 173 174 175 176 177 178
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

179 180 181 182 183 184 185 186 187 188 189
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
  SSdb *pSdb = pMnode->pSdb;

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    bool roleChanged = false;
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
190 191 192 193 194 195 196 197 198 199
      SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
      if (pGid->dnodeId == dnodeId) {
        if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) {
          mInfo(
              "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:error restored:0 "
              "canRead:0",
              pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead);
          pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
          pGid->syncRestore = 0;
          pGid->syncCanRead = 0;
200 201 202 203 204 205 206 207
          roleChanged = true;
        }
        break;
      }
    }

    if (roleChanged) {
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
208
      if (pDb != NULL && pDb->stateTs != curMs) {
209 210 211 212 213 214 215 216 217 218 219 220
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
              curMs);
        pDb->stateTs = curMs;
      }
      mndReleaseDb(pMnode, pDb);
    }

    sdbRelease(pSdb, pVgroup);
  }
}

static void mndCheckDnodeOffline(SMnode *pMnode) {
221
  mTrace("check dnode offline");
222 223
  if (mndAcquireRpc(pMnode) != 0) return;

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
  SSdb   *pSdb = pMnode->pSdb;
  int64_t curMs = taosGetTimestampMs();

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pDnode = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;

    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (!online) {
      mInfo("dnode:%d, in offline state", pDnode->id);
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
    }

    sdbRelease(pSdb, pDnode);
  }
241 242

  mndReleaseRpc(pMnode);
243 244
}

S
Shengliang Guan 已提交
245
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
246
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
247 248 249 250 251
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
K
kailixu 已提交
252
    taosMsleep(100);
S
Shengliang Guan 已提交
253
    if (mndGetStop(pMnode)) break;
254
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
255

256 257
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
258
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
259 260
    }

261
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
262 263 264
      mndPullupTrans(pMnode);
    }

265
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
266 267 268
      mndCalMqRebalance(pMnode);
    }

269 270 271 272 273 274
#if 0
    if (sec % tsStreamCheckpointTickInterval == 0) {
      mndStreamCheckpointTick(pMnode, sec);
    }
#endif

S
Shengliang Guan 已提交
275
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
276 277
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
278

279
    if (sec % tsGrantHBInterval == 0) {
280
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
281
    }
282

283
    if (sec % tsUptimeInterval == 0) {
284 285
      mndIncreaseUpTime(pMnode);
    }
286 287 288

    if (sec % (tsStatusInterval * 5) == 0) {
      mndCheckDnodeOffline(pMnode);
S
Shengliang Guan 已提交
289 290 291
    }

    if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) {
292
      mndSyncCheckTimeout(pMnode);
293
    }
S
Shengliang Guan 已提交
294 295
  }

S
Shengliang Guan 已提交
296
  return NULL;
L
Liu Jicong 已提交
297 298
}

299
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
300 301 302 303 304
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
305 306
    return -1;
  }
L
Liu Jicong 已提交
307

S
Shengliang Guan 已提交
308 309
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
310 311 312
  return 0;
}

313
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
314 315
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
316
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
317 318 319
  }
}

S
Shengliang Guan 已提交
320
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
321
  pMnode->path = taosStrdup(path);
322 323
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
324
    return -1;
325 326 327 328
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
329
    return -1;
330
  }
331 332

  return 0;
333
}
S
Shengliang Guan 已提交
334

335 336 337 338 339 340 341 342
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
343 344
      .retentionPeriod = 0,
      .retentionSize = 0,
345 346 347 348 349
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
350
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
351 352 353 354 355 356 357 358 359 360 361 362 363
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

364 365 366
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
367
  opt.pMnode = pMnode;
368
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
369

S
Shengliang Guan 已提交
370
  pMnode->pSdb = sdbInit(&opt);
371
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
372 373 374 375 376 377
    return -1;
  }

  return 0;
}

378
static int32_t mndOpenSdb(SMnode *pMnode) {
D
dapan1121 已提交
379
  int32_t code = 0;
380
  if (!pMnode->deploy) {
D
dapan1121 已提交
381
    code = sdbReadFile(pMnode->pSdb);
382
  }
D
dapan1121 已提交
383 384 385

  atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
  return code;
386
}
387 388 389

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
390
    sdbCleanup(pMnode->pSdb);
391 392 393 394
    pMnode->pSdb = NULL;
  }
}

395 396 397 398 399
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
400
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
401 402 403 404
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
405 406 407
  return 0;
}

408
static int32_t mndInitSteps(SMnode *pMnode) {
409
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
410 411 412 413
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
414
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
415
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
416
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
417
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
418
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
419
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
420
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
421
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
422
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
423 424
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
425
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
426
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
427
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
dengyihao's avatar
dengyihao 已提交
428
  if (mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx) != 0) return -1;
D
dapan1121 已提交
429
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
430
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
431
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
432
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
433
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
434 435
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
436
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
437 438
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
439 440 441 442

  return 0;
}

443
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
444 445
  if (pMnode->pSteps == NULL) return;

446
  if (pos == -1) {
447
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
448 449
  }

450
  for (int32_t s = pos; s >= 0; s--) {
451
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
452
    mInfo("%s will cleanup", pStep->name);
453 454 455
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
456 457
  }

S
Shengliang Guan 已提交
458
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
459
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
460
  pMnode->pSteps = NULL;
461
}
S
Shengliang Guan 已提交
462

463
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
464
  int32_t size = taosArrayGetSize(pMnode->pSteps);
465
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
466
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
467
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
468

S
Shengliang Guan 已提交
469
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
470
      int32_t code = terrno;
S
Shengliang Guan 已提交
471
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
472
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
473
      terrno = code;
S
Shengliang Guan 已提交
474
      return -1;
S
Shengliang Guan 已提交
475
    } else {
476
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
477
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
478 479
    }
  }
S
Shengliang Guan 已提交
480

S
shm  
Shengliang Guan 已提交
481
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
482
  return 0;
483
}
S
Shengliang Guan 已提交
484

S
shm  
Shengliang Guan 已提交
485
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
486
  pMnode->msgCb = pOption->msgCb;
487
  pMnode->selfDnodeId = pOption->dnodeId;
488 489
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
C
cadem 已提交
490 491
  pMnode->syncMgmt.numOfTotalReplicas = pOption->numOfTotalReplicas;
  pMnode->syncMgmt.lastIndex = pOption->lastIndex;
492
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
C
cadem 已提交
493
  memcpy(pMnode->syncMgmt.nodeRoles, pOption->nodeRoles, sizeof(pOption->nodeRoles));
L
Liu Jicong 已提交
494
}
495

496
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
497
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
498

wafwerar's avatar
wafwerar 已提交
499
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
500 501 502 503 504
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
505
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
506

S
Shengliang Guan 已提交
507 508
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
509
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
510

511
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
512 513
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
514
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
515 516 517 518
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
519

S
Shengliang Guan 已提交
520
  int32_t code = mndCreateDir(pMnode, path);
521
  if (code != 0) {
S
Shengliang Guan 已提交
522 523
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
524 525 526 527 528
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

529
  code = mndInitSteps(pMnode);
530
  if (code != 0) {
S
Shengliang Guan 已提交
531 532
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
533 534 535 536 537 538 539
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
540 541
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
542 543 544 545
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
546

S
Shengliang Guan 已提交
547
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
548 549
  return pMnode;
}
S
Shengliang Guan 已提交
550

551 552 553
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
554
    syncPreStop(pMnode->syncMgmt.sync);
555
    sdbWriteFile(pMnode->pSdb, 0);
556 557 558
  }
}

559
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
560
  if (pMnode != NULL) {
561
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
562
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
563 564
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
565
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
566
  }
567
}
S
Shengliang Guan 已提交
568

569
int32_t mndStart(SMnode *pMnode) {
570
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
571
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
572 573 574 575
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
576
    mndSetRestored(pMnode, true);
577
  }
M
Minghao Li 已提交
578

C
Cary Xu 已提交
579
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
580

M
Minghao Li 已提交
581 582 583
  return mndInitTimer(pMnode);
}

C
cadem 已提交
584 585 586 587 588
int32_t mndIsCatchUp(SMnode *pMnode) {
  int64_t rid = pMnode->syncMgmt.sync;
  return syncIsCatchUp(rid);
}

C
cadem 已提交
589 590 591 592 593
ESyncRole mndGetRole(SMnode *pMnode){
  int64_t rid = pMnode->syncMgmt.sync;
  return syncGetRole(rid);
}

594
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
595
  mndSetStop(pMnode);
596
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
597
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
598 599
}

M
Minghao Li 已提交
600
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
601
  SMnode    *pMnode = pMsg->info.node;
602
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
603

S
Shengliang Guan 已提交
604 605
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
606

S
Shengliang Guan 已提交
607
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
608
  if (code != 0) {
S
Shengliang Guan 已提交
609
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
610
  }
S
Shengliang Guan 已提交
611

612
  return code;
M
Minghao Li 已提交
613 614
}

S
Shengliang Guan 已提交
615
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
616
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
617 618
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
619
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
620 621
    return 0;
  }
622

623 624 625 626 627 628 629 630 631
  SMnode *pMnode = pMsg->info.node;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_APP_IS_STOPPING;
    return -1;
  }

  terrno = 0;
632
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
633 634 635 636 637 638 639 640 641 642
  if (terrno != 0) {
    taosThreadRwlockUnlock(&pMnode->lock);
    return -1;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    goto _OVER;
  }
643

644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
  if (!state.restored || !pMnode->restored) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_SYN_RESTORING;
    goto _OVER;
  }

#if 1
  atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is acquired, ref:%d", ref);
#endif

  taosThreadRwlockUnlock(&pMnode->lock);
  return 0;

_OVER:
L
Liu Jicong 已提交
661
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
662 663
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
664
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
C
cadem 已提交
665
           pMnode->stopped, state.restored, syncStr(state.state));
S
Shengliang Guan 已提交
666 667
    return -1;
  }
S
Shengliang Guan 已提交
668

669 670
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
671
  int32_t         tmpCode = terrno;
672
  mndGetMnodeEpSet(pMnode, &epSet);
673
  terrno = tmpCode;
674

675
  mGDebug(
676
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
677
      "role:%s, redirect numOfEps:%d inUse:%d, type:%s",
678
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
679
      syncStr(state.restored), epSet.numOfEps, epSet.inUse, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
680

681
  if (epSet.numOfEps <= 0) return -1;
S
Shengliang Guan 已提交
682

683 684 685 686 687 688 689 690 691 692
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
  }

  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->info.rsp = rpcMallocCont(contLen);
  if (pMsg->info.rsp != NULL) {
    tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
    pMsg->info.hasEpSet = 1;
    pMsg->info.rspLen = contLen;
S
Shengliang Guan 已提交
693
  }
S
Shengliang Guan 已提交
694 695

  return -1;
696 697
}

S
Shengliang Guan 已提交
698
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
699
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
700 701
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
702
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
703
  if (fp == NULL) {
S
Shengliang Guan 已提交
704
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
705 706
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
707 708
  }

S
Shengliang Guan 已提交
709 710
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
711
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
712
  int32_t code = (*fp)(pMsg);
713
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
714

S
Shengliang Guan 已提交
715
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
716
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
717
  } else if (code == 0) {
S
Shengliang Guan 已提交
718
    mGTrace("msg:%p, successfully processed", pMsg);
719
  } else {
720 721 722
    if (code == -1) {
      code = terrno;
    }
723
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, tstrerror(code), pMsg->info.ahandle,
S
Shengliang Guan 已提交
724
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
725
  }
S
Shengliang Guan 已提交
726

S
shm  
Shengliang Guan 已提交
727
  return code;
S
Shengliang Guan 已提交
728 729
}

730 731
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
732
  if (type < TDMT_MAX) {
733
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
734 735 736
  }
}

D
dapan1121 已提交
737
// Note: uid 0 is reserved
738
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
739
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
740
  do {
L
Liu Jicong 已提交
741
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
742 743
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
744
    if (uuid) {
L
Liu Jicong 已提交
745
      return llabs(uuid);
D
dapan1121 已提交
746 747
    }
  } while (true);
L
Liu Jicong 已提交
748
}
S
Shengliang Guan 已提交
749 750

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
751
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
752
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
753

M
Minghao Li 已提交
754
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
755 756 757 758 759
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
760 761 762
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
763
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
764 765 766 767
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
768
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
769 770
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
771 772
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
773 774
  pClusterInfo->topics_toal = sdbGetSize(pSdb, SDB_TOPIC);
  pClusterInfo->streams_total = sdbGetSize(pSdb, SDB_STREAM);
S
Shengliang Guan 已提交
775 776 777 778 779 780 781 782 783 784

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
785
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

804
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
805 806
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
K
kailixu 已提交
807
      pClusterInfo->master_uptime = (float)mndGetClusterUpTime(pMnode) / 86400.0f;
808
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
809 810
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
811
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
812
    }
813 814
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
815 816 817 818 819 820 821 822 823 824
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
825
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
826 827 828

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
829 830 831 832 833

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
834 835 836 837
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
838
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
839 840
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
841 842
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
843 844 845
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
846
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
S
Shengliang Guan 已提交
847 848 849 850 851 852 853 854 855
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
856 857 858 859 860 861 862
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
863
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
877
  // grant info
D
dapan1121 已提交
878
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
S
Shengliang Guan 已提交
879 880 881 882 883 884
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

885
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
886
  return 0;
L
Liu Jicong 已提交
887
}
S
Shengliang Guan 已提交
888 889

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
890 891 892
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
S
Shungang Li 已提交
893 894 895 896
  pLoad->syncTerm = state.term;
  pLoad->roleTimeMs = state.roleTimeMs;
  mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
         syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
S
Shengliang Guan 已提交
897
  return 0;
L
fix  
Liu Jicong 已提交
898
}
S
Shengliang Guan 已提交
899

900
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
901 902 903 904
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
905
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
906 907 908 909
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
910
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
924
  mInfo("mnode set stopped");
S
Shengliang Guan 已提交
925 926 927
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }