mndMain.c 28.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndAcct.h"
S
Shengliang Guan 已提交
18
#include "mndBnode.h"
S
Shengliang Guan 已提交
19
#include "mndCluster.h"
L
Liu Jicong 已提交
20
#include "mndConsumer.h"
S
Shengliang Guan 已提交
21 22 23
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
24
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
L
Liu Jicong 已提交
28
#include "mndPerfSchema.h"
M
Minghao Li 已提交
29
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
61
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
62 63 64 65
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
66 67
}

68
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
69
  int32_t contLen = 0;
M
Minghao Li 已提交
70
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
71
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
72 73 74
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
75 76
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
77
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
78 79 80 81
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
82
}
L
Liu Jicong 已提交
83

S
Shengliang Guan 已提交
84 85
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
86
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
87 88 89 90
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
91 92
}

93
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
94 95 96 97 98 99 100 101 102
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

103 104 105 106 107 108 109 110 111 112
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
113
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
114
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
115 116 117 118 119 120
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
121
    if (mndGetStop(pMnode)) break;
122
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
123

124 125 126
    int64_t sec = lastTime / 10;

    if (sec % tsTtlPushInterval == 0) {
127
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
128 129
    }

130
    if (sec % tsTransPullupInterval * 10 == 0) {
S
Shengliang Guan 已提交
131 132 133
      mndPullupTrans(pMnode);
    }

134
    if (sec % tsMqRebalanceInterval * 10 == 0) {
S
Shengliang Guan 已提交
135 136 137
      mndCalMqRebalance(pMnode);
    }

138
    if (sec % tsTelemInterval * 10 == (MIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
139 140
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
141

142
    if (sec % tsGrantHBInterval == 0) {
143
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
144
    }
145

146
    if (sec % tsUptimeInterval == 0) {
147 148
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
149 150
  }

S
Shengliang Guan 已提交
151
  return NULL;
L
Liu Jicong 已提交
152 153
}

154
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
155 156 157 158 159
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
160 161
    return -1;
  }
L
Liu Jicong 已提交
162

S
Shengliang Guan 已提交
163 164
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
165 166 167
  return 0;
}

168
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
169 170
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
171
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
172 173 174
  }
}

S
Shengliang Guan 已提交
175
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
176 177 178
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
179
    return -1;
180 181 182 183
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
184
    return -1;
185
  }
186 187

  return 0;
188
}
S
Shengliang Guan 已提交
189

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

219 220 221
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
222
  opt.pMnode = pMnode;
223
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
224

S
Shengliang Guan 已提交
225
  pMnode->pSdb = sdbInit(&opt);
226
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
227 228 229 230 231 232
    return -1;
  }

  return 0;
}

233 234 235 236 237 238 239
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
240 241 242

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
243
    sdbCleanup(pMnode->pSdb);
244 245 246 247
    pMnode->pSdb = NULL;
  }
}

248 249 250 251 252
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
253
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
254 255 256 257
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
258 259 260
  return 0;
}

261
static int32_t mndInitSteps(SMnode *pMnode) {
262
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
263 264 265 266
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
267
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
268 269
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
270
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
271
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
272
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
273
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
274
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
275
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
276
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
277 278
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
279
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
280
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
281
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
282
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
283
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
284
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
285
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
286
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
287
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
288 289
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
290
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
291 292
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
293 294 295 296

  return 0;
}

297
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
298 299
  if (pMnode->pSteps == NULL) return;

300
  if (pos == -1) {
301
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
302 303
  }

304
  for (int32_t s = pos; s >= 0; s--) {
305
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
306
    mInfo("%s will cleanup", pStep->name);
307 308 309
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
310 311
  }

S
Shengliang Guan 已提交
312
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
313
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
314
  pMnode->pSteps = NULL;
315
}
S
Shengliang Guan 已提交
316

317
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
318
  int32_t size = taosArrayGetSize(pMnode->pSteps);
319
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
320
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
321
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
322

S
Shengliang Guan 已提交
323
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
324
      int32_t code = terrno;
S
Shengliang Guan 已提交
325
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
326
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
327
      terrno = code;
S
Shengliang Guan 已提交
328
      return -1;
S
Shengliang Guan 已提交
329
    } else {
330
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
331
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
332 333
    }
  }
S
Shengliang Guan 已提交
334

S
shm  
Shengliang Guan 已提交
335
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
336
  return 0;
337
}
S
Shengliang Guan 已提交
338

S
shm  
Shengliang Guan 已提交
339
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
340
  pMnode->msgCb = pOption->msgCb;
341
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
342
  pMnode->syncMgmt.replica = pOption->replica;
S
Shengliang Guan 已提交
343
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
344
}
345

346
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
347
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
348

wafwerar's avatar
wafwerar 已提交
349
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
350 351 352 353 354 355
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
356 357
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
358
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
359

360
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
361 362
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
363
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
364 365 366 367
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
368

S
Shengliang Guan 已提交
369
  int32_t code = mndCreateDir(pMnode, path);
370
  if (code != 0) {
S
Shengliang Guan 已提交
371 372
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
373 374 375 376 377
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

378
  code = mndInitSteps(pMnode);
379
  if (code != 0) {
S
Shengliang Guan 已提交
380 381
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
382 383 384 385 386 387 388
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
389 390
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
391 392 393 394
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
395

396
  mInfo("mnode open successfully ");
S
Shengliang Guan 已提交
397 398
  return pMnode;
}
S
Shengliang Guan 已提交
399

400 401
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
402
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
403
    syncLeaderTransfer(pMnode->syncMgmt.sync);
404

405 406 407 408 409 410 411 412 413
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
414 415 416
  }
}

417
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
418
  if (pMnode != NULL) {
419
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
420
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
421 422
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
423
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
424
  }
425
}
S
Shengliang Guan 已提交
426

427
int32_t mndStart(SMnode *pMnode) {
428
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
429
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
430 431 432 433 434
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
435
  }
M
Minghao Li 已提交
436

C
Cary Xu 已提交
437
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
438

M
Minghao Li 已提交
439 440 441
  return mndInitTimer(pMnode);
}

442
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
443
  mndSetStop(pMnode);
444
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
445
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
446 447
}

M
Minghao Li 已提交
448
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
449
  SMnode    *pMnode = pMsg->info.node;
450
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
451
  int32_t    code = 0;
M
Minghao Li 已提交
452

453 454
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
455 456
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
457
  }
M
Minghao Li 已提交
458

459 460 461
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
462 463
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
464
  }
465

466
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
467
  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
468
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
469 470 471
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
472
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
473 474 475
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
476
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
477 478 479
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
480
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
481
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
482
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
483
      syncClientRequestDestroy(pSyncMsg);
484
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
485 486 487
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
488
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
489 490 491
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
492
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
493 494 495
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
496
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
497 498 499
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
500
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
501 502 503
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
504
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
505 506 507
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
508
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
509 510 511
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
512 513
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
514
      code = -1;
M
Minghao Li 已提交
515
    }
M
Minghao Li 已提交
516
  } else {
517
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
518 519 520
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
521
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
522 523 524
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
525
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
526 527 528
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
529
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
530
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
531
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
532
      syncClientRequestDestroy(pSyncMsg);
533
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
534 535 536
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
537
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
538 539 540
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
541
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
542 543 544
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
545
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
546 547 548
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
549
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
550 551 552
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
553 554
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
555
      code = -1;
M
Minghao Li 已提交
556
    }
M
Minghao Li 已提交
557 558
  }

M
Minghao Li 已提交
559 560
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
561 562 563
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
564
  return code;
M
Minghao Li 已提交
565 566
}

S
Shengliang Guan 已提交
567
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
568
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
569 570
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
571
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
572 573
    return 0;
  }
S
Shengliang Guan 已提交
574
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
575
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
576 577
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
S
Shengliang Guan 已提交
578 579
    return -1;
  }
S
Shengliang Guan 已提交
580

S
Shengliang Guan 已提交
581 582
  SEpSet epSet = {0};
  mndGetMnodeEpSet(pMsg->info.node, &epSet);
583

S
Shengliang Guan 已提交
584 585 586 587
  const STraceId *trace = &pMsg->info.traceId;
  mError("msg:%p, failed to check mnode state since %s, type:%s, numOfMnodes:%d inUse:%d", pMsg, terrstr(),
         TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse);

S
Shengliang Guan 已提交
588 589 590 591 592 593 594
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
      mInfo("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
595
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
596 597 598 599 600 601 602
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
603
  } else {
S
Shengliang Guan 已提交
604
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
605
  }
S
Shengliang Guan 已提交
606 607

  return -1;
608 609
}

S
Shengliang Guan 已提交
610
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
611 612
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
613

S
Shengliang Guan 已提交
614 615
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
616
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
617 618 619 620
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
621
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
622
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
623 624
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
625
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
626
  if (fp == NULL) {
S
Shengliang Guan 已提交
627
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
628 629
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
630 631
  }

S
Shengliang Guan 已提交
632 633 634
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
635
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
636
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
637 638
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
639
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
640
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
641
  } else if (code == 0) {
S
Shengliang Guan 已提交
642
    mGTrace("msg:%p, successfully processed", pMsg);
643
  } else {
S
Shengliang Guan 已提交
644 645
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
646
  }
S
Shengliang Guan 已提交
647

S
shm  
Shengliang Guan 已提交
648
  return code;
S
Shengliang Guan 已提交
649 650
}

651 652 653 654
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
655 656 657
  }
}

D
dapan1121 已提交
658
// Note: uid 0 is reserved
659
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
660
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
661
  do {
L
Liu Jicong 已提交
662
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
663 664
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
665
    if (uuid) {
L
Liu Jicong 已提交
666
      return llabs(uuid);
D
dapan1121 已提交
667 668
    }
  } while (true);
L
Liu Jicong 已提交
669
}
S
Shengliang Guan 已提交
670 671

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
672
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
673
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
674

M
Minghao Li 已提交
675
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
676 677 678 679 680
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
681 682 683
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
S
Shengliang Guan 已提交
684
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
685 686 687 688
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
689
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
690 691
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
692 693
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
694 695 696 697 698 699 700 701 702 703

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
704
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

723
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
724 725
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
726 727
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
728 729 730
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
731
    }
732 733
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
734 735 736 737 738 739 740 741 742 743
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
744
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
745 746 747

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
748 749 750 751 752

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
753 754 755 756
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
757
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
758 759
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
760
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
761 762 763 764
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
765
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
766 767 768 769 770 771 772 773 774
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
775 776 777 778 779 780 781
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
782
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
783 784 785 786 787 788 789 790 791 792 793 794 795

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
796 797 798 799 800 801 802 803
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
804
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
805
  return 0;
L
Liu Jicong 已提交
806
}
S
Shengliang Guan 已提交
807 808

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
809
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
810
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
811
  return 0;
L
fix  
Liu Jicong 已提交
812
}
S
Shengliang Guan 已提交
813 814 815 816 817 818 819 820 821 822 823

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
824
    // mTrace("mnode rpc is acquired, ref:%d", ref);
S
Shengliang Guan 已提交
825 826 827 828 829 830 831 832
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
833
  // mTrace("mnode rpc is released, ref:%d", ref);
S
Shengliang Guan 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }