mndMain.c 26.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
48
    terrno = TSDB_CODE_APP_IS_STOPPING;
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
  SMStreamTickReq timerReq = {
      .tick = sec,
  };

  int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMStreamTickMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
103 104
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
105
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
106 107 108 109
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
110 111
}

112
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
113
  int32_t contLen = 0;
M
Minghao Li 已提交
114
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
115
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
116 117 118
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
119 120
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
121
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
122
  if (pReq != NULL) {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_TMQ_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
  int32_t contLen = 0;
  void   *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
S
Shengliang Guan 已提交
141 142
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
143
}
L
Liu Jicong 已提交
144

S
Shengliang Guan 已提交
145 146
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
147
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
148 149 150 151
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
152 153
}

154
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
155 156 157 158 159 160 161 162 163
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

164 165 166 167 168 169 170 171 172 173
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

174 175 176 177 178 179 180 181 182 183 184
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
  SSdb *pSdb = pMnode->pSdb;

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    bool roleChanged = false;
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
185 186 187 188 189 190 191 192 193 194
      SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
      if (pGid->dnodeId == dnodeId) {
        if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) {
          mInfo(
              "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:error restored:0 "
              "canRead:0",
              pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead);
          pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
          pGid->syncRestore = 0;
          pGid->syncCanRead = 0;
195 196 197 198 199 200 201 202
          roleChanged = true;
        }
        break;
      }
    }

    if (roleChanged) {
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
203
      if (pDb != NULL && pDb->stateTs != curMs) {
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
              curMs);
        pDb->stateTs = curMs;
      }
      mndReleaseDb(pMnode, pDb);
    }

    sdbRelease(pSdb, pVgroup);
  }
}

static void mndCheckDnodeOffline(SMnode *pMnode) {
  SSdb   *pSdb = pMnode->pSdb;
  int64_t curMs = taosGetTimestampMs();

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pDnode = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;

    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (!online) {
      mInfo("dnode:%d, in offline state", pDnode->id);
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
    }

    sdbRelease(pSdb, pDnode);
  }
}

S
Shengliang Guan 已提交
235
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
236
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
237 238 239 240 241 242
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
243
    if (mndGetStop(pMnode)) break;
244
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
245

246 247
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
248
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
249 250
    }

251
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
252 253 254
      mndPullupTrans(pMnode);
    }

255
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
256 257 258
      mndCalMqRebalance(pMnode);
    }

259 260 261 262 263 264
#if 0
    if (sec % tsStreamCheckpointTickInterval == 0) {
      mndStreamCheckpointTick(pMnode, sec);
    }
#endif

S
Shengliang Guan 已提交
265
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
266 267
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
268

269
    if (sec % tsGrantHBInterval == 0) {
270
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
271
    }
272

273
    if (sec % tsUptimeInterval == 0) {
274 275
      mndIncreaseUpTime(pMnode);
    }
276 277 278 279

    if (sec % (tsStatusInterval * 5) == 0) {
      mndCheckDnodeOffline(pMnode);
    }
S
Shengliang Guan 已提交
280 281
  }

S
Shengliang Guan 已提交
282
  return NULL;
L
Liu Jicong 已提交
283 284
}

285
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
286 287 288 289 290
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
291 292
    return -1;
  }
L
Liu Jicong 已提交
293

S
Shengliang Guan 已提交
294 295
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
296 297 298
  return 0;
}

299
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
300 301
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
302
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
303 304 305
  }
}

S
Shengliang Guan 已提交
306
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
307 308 309
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
310
    return -1;
311 312 313 314
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
315
    return -1;
316
  }
317 318

  return 0;
319
}
S
Shengliang Guan 已提交
320

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
336
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
337 338 339 340 341 342 343 344 345 346 347 348 349
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

350 351 352
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
353
  opt.pMnode = pMnode;
354
  opt.pWal = pMnode->pWal;
355
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
356

S
Shengliang Guan 已提交
357
  pMnode->pSdb = sdbInit(&opt);
358
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
359 360 361 362 363 364
    return -1;
  }

  return 0;
}

365 366 367 368 369 370 371
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
372 373 374

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
375
    sdbCleanup(pMnode->pSdb);
376 377 378 379
    pMnode->pSdb = NULL;
  }
}

380 381 382 383 384
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
385
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
386 387 388 389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
390 391 392
  return 0;
}

393
static int32_t mndInitSteps(SMnode *pMnode) {
394
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
395 396 397 398
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
399
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
400
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
401
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
402
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
403
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
404
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
405
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
406
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
407
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
408 409
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
410
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
411
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
412
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
413
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
414
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
415
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
416
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
417
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
418 419
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
420
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
421 422
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
423 424 425 426

  return 0;
}

427
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
428 429
  if (pMnode->pSteps == NULL) return;

430
  if (pos == -1) {
431
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
432 433
  }

434
  for (int32_t s = pos; s >= 0; s--) {
435
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
436
    mInfo("%s will cleanup", pStep->name);
437 438 439
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
440 441
  }

S
Shengliang Guan 已提交
442
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
443
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
444
  pMnode->pSteps = NULL;
445
}
S
Shengliang Guan 已提交
446

447
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
448
  int32_t size = taosArrayGetSize(pMnode->pSteps);
449
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
450
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
451
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
452

S
Shengliang Guan 已提交
453
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
454
      int32_t code = terrno;
S
Shengliang Guan 已提交
455
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
456
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
457
      terrno = code;
S
Shengliang Guan 已提交
458
      return -1;
S
Shengliang Guan 已提交
459
    } else {
460
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
461
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
462 463
    }
  }
S
Shengliang Guan 已提交
464

S
shm  
Shengliang Guan 已提交
465
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
466
  return 0;
467
}
S
Shengliang Guan 已提交
468

S
shm  
Shengliang Guan 已提交
469
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
470
  pMnode->msgCb = pOption->msgCb;
471
  pMnode->selfDnodeId = pOption->dnodeId;
472 473 474
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
475
}
476

477
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
478
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
479

wafwerar's avatar
wafwerar 已提交
480
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
481 482 483 484 485
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
486
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
487

S
Shengliang Guan 已提交
488 489
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
490
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
491

492
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
493 494
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
495
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
496 497 498 499
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
500

S
Shengliang Guan 已提交
501
  int32_t code = mndCreateDir(pMnode, path);
502
  if (code != 0) {
S
Shengliang Guan 已提交
503 504
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
505 506 507 508 509
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

510
  code = mndInitSteps(pMnode);
511
  if (code != 0) {
S
Shengliang Guan 已提交
512 513
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
514 515 516 517 518 519 520
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
521 522
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
523 524 525 526
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
527

S
Shengliang Guan 已提交
528
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
529 530
  return pMnode;
}
S
Shengliang Guan 已提交
531

532 533 534
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
535
    syncPreStop(pMnode->syncMgmt.sync);
536
#if 0
537
    while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
538 539 540
      mInfo("vgId:1, snapshot is recving");
      taosMsleep(300);
    }
541
    while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
542 543 544
      mInfo("vgId:1, snapshot is sending");
      taosMsleep(300);
    }
545
#endif
546 547 548
  }
}

549
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
550
  if (pMnode != NULL) {
551
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
552
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
553 554
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
555
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
556
  }
557
}
S
Shengliang Guan 已提交
558

559
int32_t mndStart(SMnode *pMnode) {
560
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
561
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
562 563 564 565
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
566
    mndSetRestored(pMnode, true);
567
  }
M
Minghao Li 已提交
568

C
Cary Xu 已提交
569
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
570

M
Minghao Li 已提交
571 572 573
  return mndInitTimer(pMnode);
}

574
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
575
  mndSetStop(pMnode);
576
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
577
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
578 579
}

M
Minghao Li 已提交
580
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
581
  SMnode    *pMnode = pMsg->info.node;
582
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
583

S
Shengliang Guan 已提交
584 585
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
586

S
Shengliang Guan 已提交
587
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
588
  if (code != 0) {
S
Shengliang Guan 已提交
589
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
590
  }
S
Shengliang Guan 已提交
591

592
  return code;
M
Minghao Li 已提交
593 594
}

S
Shengliang Guan 已提交
595
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
596
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
597 598
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
599
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
600 601
    return 0;
  }
602

603 604 605 606 607 608 609 610 611
  SMnode *pMnode = pMsg->info.node;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_APP_IS_STOPPING;
    return -1;
  }

  terrno = 0;
612
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
613 614 615 616 617 618 619 620 621 622
  if (terrno != 0) {
    taosThreadRwlockUnlock(&pMnode->lock);
    return -1;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    goto _OVER;
  }
623

624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
  if (!state.restored || !pMnode->restored) {
    taosThreadRwlockUnlock(&pMnode->lock);
    terrno = TSDB_CODE_SYN_RESTORING;
    goto _OVER;
  }

#if 1
  atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is acquired, ref:%d", ref);
#endif

  taosThreadRwlockUnlock(&pMnode->lock);
  return 0;

_OVER:
L
Liu Jicong 已提交
641
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
642 643
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
644
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
645
           pMnode->stopped, state.restored, syncStr(state.restored));
S
Shengliang Guan 已提交
646 647
    return -1;
  }
S
Shengliang Guan 已提交
648

649 650
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
651
  mndGetMnodeEpSet(pMnode, &epSet);
652

653
  mGDebug(
654 655
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
656 657
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
      syncStr(state.restored), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
658

659
  if (epSet.numOfEps <= 0) return -1;
S
Shengliang Guan 已提交
660

661 662 663 664 665 666 667 668 669 670
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
  }

  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->info.rsp = rpcMallocCont(contLen);
  if (pMsg->info.rsp != NULL) {
    tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
    pMsg->info.hasEpSet = 1;
    pMsg->info.rspLen = contLen;
S
Shengliang Guan 已提交
671
  }
S
Shengliang Guan 已提交
672 673

  return -1;
674 675
}

S
Shengliang Guan 已提交
676
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
677
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
678 679
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
680
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
681
  if (fp == NULL) {
S
Shengliang Guan 已提交
682
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
683 684
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
685 686
  }

S
Shengliang Guan 已提交
687 688
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
689
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
690
  int32_t code = (*fp)(pMsg);
691
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
692

S
Shengliang Guan 已提交
693
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
694
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
695
  } else if (code == 0) {
S
Shengliang Guan 已提交
696
    mGTrace("msg:%p, successfully processed", pMsg);
697
  } else {
S
Shengliang Guan 已提交
698 699
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
700
  }
S
Shengliang Guan 已提交
701

S
shm  
Shengliang Guan 已提交
702
  return code;
S
Shengliang Guan 已提交
703 704
}

705 706
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
707
  if (type < TDMT_MAX) {
708
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
709 710 711
  }
}

D
dapan1121 已提交
712
// Note: uid 0 is reserved
713
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
714
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
715
  do {
L
Liu Jicong 已提交
716
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
717 718
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
719
    if (uuid) {
L
Liu Jicong 已提交
720
      return llabs(uuid);
D
dapan1121 已提交
721 722
    }
  } while (true);
L
Liu Jicong 已提交
723
}
S
Shengliang Guan 已提交
724 725

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
726
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
727
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
728

M
Minghao Li 已提交
729
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
730 731 732 733 734
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
735 736 737
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
738
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
739 740 741 742
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
743
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
744 745
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
746 747
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
748 749 750 751 752 753 754 755 756 757

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
758
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

777
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
778 779
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
780 781
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
782 783
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
784
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
785
    }
786 787
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
788 789 790 791 792 793 794 795 796 797
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
798
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
799 800 801

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
802 803 804 805 806

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
807 808 809 810
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
811
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
812 813
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
814 815
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
816 817 818
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
819
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
S
Shengliang Guan 已提交
820 821 822 823 824 825 826 827 828
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
829 830 831 832 833 834 835
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
836
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
837 838 839 840 841 842 843 844 845 846 847 848 849

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
850 851 852 853 854 855 856 857
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

858
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
859
  return 0;
L
Liu Jicong 已提交
860
}
S
Shengliang Guan 已提交
861 862

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
863 864 865
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
866
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
867
  return 0;
L
fix  
Liu Jicong 已提交
868
}
S
Shengliang Guan 已提交
869

870
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
871 872 873 874
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
875
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
876 877 878 879
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
880
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
894
  mInfo("mnode set stopped");
S
Shengliang Guan 已提交
895 896 897
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }