mndMain.c 27.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndOffset.h"
L
Liu Jicong 已提交
27
#include "mndPerfSchema.h"
M
Minghao Li 已提交
28
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
29
#include "mndProfile.h"
S
Shengliang Guan 已提交
30
#include "mndQnode.h"
L
Liu Jicong 已提交
31
#include "mndQuery.h"
S
Shengliang Guan 已提交
32
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
33
#include "mndSma.h"
S
Shengliang Guan 已提交
34
#include "mndSnode.h"
S
Shengliang Guan 已提交
35
#include "mndStb.h"
L
Liu Jicong 已提交
36
#include "mndStream.h"
L
Liu Jicong 已提交
37
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
38 39
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
40
#include "mndTopic.h"
S
Shengliang Guan 已提交
41 42 43
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
44

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
89 90
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
91
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
92 93 94 95
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
96 97
}

98
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
99
  int32_t contLen = 0;
M
Minghao Li 已提交
100
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
101
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
102 103 104
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
105 106
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
107
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
108 109 110 111
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
112
}
L
Liu Jicong 已提交
113

S
Shengliang Guan 已提交
114 115
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
116
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
117 118 119 120
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
121 122
}

123
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
124 125 126 127 128 129 130 131 132
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

133 134 135 136 137 138 139 140 141 142
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
143
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
144
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
145 146 147 148 149 150
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
151
    if (mndGetStop(pMnode)) break;
152
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
153

154 155
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
156
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
157 158
    }

159
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
160 161 162
      mndPullupTrans(pMnode);
    }

163
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
164 165 166
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
167
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
168 169
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
170

171
    if (sec % tsGrantHBInterval == 0) {
172
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
173
    }
174

175
    if (sec % tsUptimeInterval == 0) {
176 177
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
178 179
  }

S
Shengliang Guan 已提交
180
  return NULL;
L
Liu Jicong 已提交
181 182
}

183
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
184 185 186 187 188
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
189 190
    return -1;
  }
L
Liu Jicong 已提交
191

S
Shengliang Guan 已提交
192 193
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
194 195 196
  return 0;
}

197
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
198 199
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
200
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
201 202 203
  }
}

S
Shengliang Guan 已提交
204
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
205 206 207
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
208
    return -1;
209 210 211 212
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
213
    return -1;
214
  }
215 216

  return 0;
217
}
S
Shengliang Guan 已提交
218

219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
234
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
235 236 237 238 239 240 241 242 243 244 245 246 247
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

248 249 250
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
251
  opt.pMnode = pMnode;
252
  opt.pWal = pMnode->pWal;
253
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255
  pMnode->pSdb = sdbInit(&opt);
256
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
257 258 259 260 261 262
    return -1;
  }

  return 0;
}

263 264 265 266 267 268 269
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
270 271 272

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
273
    sdbCleanup(pMnode->pSdb);
274 275 276 277
    pMnode->pSdb = NULL;
  }
}

278 279 280 281 282
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
283
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
284 285 286 287
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
288 289 290
  return 0;
}

291
static int32_t mndInitSteps(SMnode *pMnode) {
292
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
293 294 295 296
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
297
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
298
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
299
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
300
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
301
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
302
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
303
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
304
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
305
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
306 307
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
308
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
309
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
310
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
311
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
312
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
313
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
314
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
315
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
316
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
317 318
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
319
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
320 321
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
322 323 324 325

  return 0;
}

326
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
327 328
  if (pMnode->pSteps == NULL) return;

329
  if (pos == -1) {
330
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
331 332
  }

333
  for (int32_t s = pos; s >= 0; s--) {
334
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
335
    mInfo("%s will cleanup", pStep->name);
336 337 338
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
339 340
  }

S
Shengliang Guan 已提交
341
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
342
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
343
  pMnode->pSteps = NULL;
344
}
S
Shengliang Guan 已提交
345

346
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
347
  int32_t size = taosArrayGetSize(pMnode->pSteps);
348
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
349
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
350
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
351

S
Shengliang Guan 已提交
352
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
353
      int32_t code = terrno;
S
Shengliang Guan 已提交
354
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
355
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
356
      terrno = code;
S
Shengliang Guan 已提交
357
      return -1;
S
Shengliang Guan 已提交
358
    } else {
359
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
360
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
361 362
    }
  }
S
Shengliang Guan 已提交
363

S
shm  
Shengliang Guan 已提交
364
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
365
  return 0;
366
}
S
Shengliang Guan 已提交
367

S
shm  
Shengliang Guan 已提交
368
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
369
  pMnode->msgCb = pOption->msgCb;
370
  pMnode->selfDnodeId = pOption->dnodeId;
371 372 373
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
374
}
375

376
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
377
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
378

wafwerar's avatar
wafwerar 已提交
379
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
380 381 382 383 384
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
385
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
386

S
Shengliang Guan 已提交
387 388
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
389
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
390

391
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
392 393
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
394
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
395 396 397 398
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
399

S
Shengliang Guan 已提交
400
  int32_t code = mndCreateDir(pMnode, path);
401
  if (code != 0) {
S
Shengliang Guan 已提交
402 403
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
404 405 406 407 408
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

409
  code = mndInitSteps(pMnode);
410
  if (code != 0) {
S
Shengliang Guan 已提交
411 412
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
413 414 415 416 417 418 419
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
420 421
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
422 423 424 425
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
426

427
  mInfo("mnode open successfully ");
S
Shengliang Guan 已提交
428 429
  return pMnode;
}
S
Shengliang Guan 已提交
430

431 432
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
433
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
434
    syncLeaderTransfer(pMnode->syncMgmt.sync);
435

436 437 438 439 440 441 442 443 444
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
445 446 447
  }
}

448
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
449
  if (pMnode != NULL) {
450
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
451
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
452 453
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
454
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
455
  }
456
}
S
Shengliang Guan 已提交
457

458
int32_t mndStart(SMnode *pMnode) {
459
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
460
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
461 462 463 464
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
465
    mndSetRestored(pMnode, true);
466
  }
M
Minghao Li 已提交
467

C
Cary Xu 已提交
468
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
469

M
Minghao Li 已提交
470 471 472
  return mndInitTimer(pMnode);
}

473
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
474
  mndSetStop(pMnode);
475
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
476
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = 0;

  mInfo("vgId:%d, process sync ctrl msg", 1);

  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
  }

  if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
    SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
    syncHeartbeatDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
    SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
    code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
    syncHeartbeatReplyDestroy(pSyncMsg);
  }

  syncNodeRelease(pSyncNode);

  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
  return code;
}

M
Minghao Li 已提交
518
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
519
  SMnode    *pMnode = pMsg->info.node;
520
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
521
  int32_t    code = 0;
M
Minghao Li 已提交
522

523 524
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
525 526
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
527
  }
M
Minghao Li 已提交
528

529 530 531
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
532 533
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
534
  }
535

536 537
  if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
538
    code = syncNodeOnTimer(pSyncNode, pSyncMsg);
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    syncTimeoutDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
    syncClientRequestDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);

M
Minghao Li 已提交
576 577 578 579 580 581 582 583 584 585
  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
    SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
    syncSnapshotSendDestroy(pSyncMsg);

  } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
    SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
    code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
    syncSnapshotRspDestroy(pSyncMsg);

586 587 588 589 590
  } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
    code = syncSetStandby(pMgmt->sync);
    SRpcMsg rsp = {.code = code, .info = pMsg->info};
    tmsgSendRsp(&rsp);

M
Minghao Li 已提交
591
  } else {
592 593
    mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
    code = -1;
M
Minghao Li 已提交
594 595
  }

M
Minghao Li 已提交
596 597
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
598 599 600
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
601
  return code;
M
Minghao Li 已提交
602 603
}

S
Shengliang Guan 已提交
604
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
605
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
606 607
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
608
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
609 610
    return 0;
  }
611
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
612 613 614 615

  SMnode     *pMnode = pMsg->info.node;
  const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
  bool        restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
616
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
617 618
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
619 620
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
           pMnode->stopped, restored, role);
S
Shengliang Guan 已提交
621 622
    return -1;
  }
S
Shengliang Guan 已提交
623

624 625
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
626
  mndGetMnodeEpSet(pMnode, &epSet);
627

628
  mDebug(
629 630 631 632
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps,
      epSet.inUse);
S
Shengliang Guan 已提交
633

S
Shengliang Guan 已提交
634 635
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
636
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
637 638 639 640
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
641
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
642 643 644 645 646 647 648
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
649
  } else {
S
Shengliang Guan 已提交
650
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
651
  }
S
Shengliang Guan 已提交
652 653

  return -1;
654 655
}

S
Shengliang Guan 已提交
656
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
657 658
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
659

S
Shengliang Guan 已提交
660 661
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
662
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
663 664 665 666
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
667
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
668
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
669 670
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
671
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
672
  if (fp == NULL) {
S
Shengliang Guan 已提交
673
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
674 675
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
676 677
  }

S
Shengliang Guan 已提交
678 679 680
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
681
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
682
  int32_t code = (*fp)(pMsg);
683
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
684

S
Shengliang Guan 已提交
685
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
686
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
687
  } else if (code == 0) {
S
Shengliang Guan 已提交
688
    mGTrace("msg:%p, successfully processed", pMsg);
689
  } else {
S
Shengliang Guan 已提交
690 691
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
692
  }
S
Shengliang Guan 已提交
693

S
shm  
Shengliang Guan 已提交
694
  return code;
S
Shengliang Guan 已提交
695 696
}

697 698
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
699
  if (type < TDMT_MAX) {
700
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
701 702 703
  }
}

D
dapan1121 已提交
704
// Note: uid 0 is reserved
705
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
706
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
707
  do {
L
Liu Jicong 已提交
708
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
709 710
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
711
    if (uuid) {
L
Liu Jicong 已提交
712
      return llabs(uuid);
D
dapan1121 已提交
713 714
    }
  } while (true);
L
Liu Jicong 已提交
715
}
S
Shengliang Guan 已提交
716 717

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
718
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
719
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
720

M
Minghao Li 已提交
721
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
722 723 724 725 726
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
727 728 729
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
730
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
731 732 733 734
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
735
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
736 737
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
738 739
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
740 741 742 743 744 745 746 747 748 749

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
750
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

769
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
770 771
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
772 773
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
774 775
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
776
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
777
    }
778 779
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
780 781 782 783 784 785 786 787 788 789
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
790
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
791 792 793

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
794 795 796 797 798

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
799 800 801 802
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
803
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
804 805
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
806 807
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
808 809 810
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
811
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
812 813 814 815 816 817 818 819 820
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
821 822 823 824 825 826 827
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
828
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
829 830 831 832 833 834 835 836 837 838 839 840 841

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
842 843 844 845 846 847 848 849
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

850
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
851
  return 0;
L
Liu Jicong 已提交
852
}
S
Shengliang Guan 已提交
853 854

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
855
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
856 857
  pLoad->syncRestore = pMnode->restored;
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
858
  return 0;
L
fix  
Liu Jicong 已提交
859
}
S
Shengliang Guan 已提交
860

861
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }