mndMain.c 27.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndAcct.h"
S
Shengliang Guan 已提交
18
#include "mndBnode.h"
S
Shengliang Guan 已提交
19
#include "mndCluster.h"
L
Liu Jicong 已提交
20
#include "mndConsumer.h"
S
Shengliang Guan 已提交
21 22 23
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
24
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
L
Liu Jicong 已提交
28
#include "mndPerfSchema.h"
M
Minghao Li 已提交
29
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
61
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
62 63 64 65
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
66 67
}

wmmhello's avatar
wmmhello 已提交
68
static void mndTtlTimer(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
69
  int32_t contLen = 0;
M
Minghao Li 已提交
70
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
71
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
72 73 74
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
75 76
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
77
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
78 79 80 81
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
82
}
L
Liu Jicong 已提交
83

S
Shengliang Guan 已提交
84 85
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
86
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
87 88 89 90
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
91 92
}

C
Cary Xu 已提交
93 94 95 96 97 98 99 100 101 102
static void mndGrantHeartBeat(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
103
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
104
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
105 106 107 108 109 110
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
111
    if (mndGetStop(pMnode)) break;
S
Shengliang Guan 已提交
112

wmmhello's avatar
wmmhello 已提交
113
    if (lastTime % (tsTtlPushInterval * 10) == 1) {
wmmhello's avatar
wmmhello 已提交
114 115 116
      mndTtlTimer(pMnode);
    }

117
    if (lastTime % (tsTransPullupInterval * 10) == 10) {
S
Shengliang Guan 已提交
118 119 120 121 122 123 124
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

125
    if (lastTime % (tsTelemInterval * 10) == 30) {
S
Shengliang Guan 已提交
126 127
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
128 129 130 131

    if (lastTime % (tsGrantHBInterval * 10) == 0) {
      mndGrantHeartBeat(pMnode);
    }
S
Shengliang Guan 已提交
132 133
  }

S
Shengliang Guan 已提交
134
  return NULL;
L
Liu Jicong 已提交
135 136
}

137
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
138 139 140 141 142
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
143 144
    return -1;
  }
L
Liu Jicong 已提交
145

S
Shengliang Guan 已提交
146 147
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
148 149 150
  return 0;
}

151
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
152 153
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
154
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
155 156 157
  }
}

S
Shengliang Guan 已提交
158
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
159 160 161
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
162
    return -1;
163 164 165 166
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
167
    return -1;
168
  }
169 170

  return 0;
171
}
S
Shengliang Guan 已提交
172

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

202 203 204
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
205
  opt.pMnode = pMnode;
206
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
207

S
Shengliang Guan 已提交
208
  pMnode->pSdb = sdbInit(&opt);
209
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
210 211 212 213 214 215
    return -1;
  }

  return 0;
}

216 217 218 219 220 221 222
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
223 224 225

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
226
    sdbCleanup(pMnode->pSdb);
227 228 229 230
    pMnode->pSdb = NULL;
  }
}

231 232 233 234 235
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
236
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
237 238 239 240
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
241 242 243
  return 0;
}

244
static int32_t mndInitSteps(SMnode *pMnode) {
245
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
246 247 248 249
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
250
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
251 252
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
253
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
254
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
255
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
256
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
257
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
258
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
259
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
260 261
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
262
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
263
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
264
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
265
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
266
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
267
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
268
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
269
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
270
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
271 272
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
273
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
274 275
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
276 277 278 279

  return 0;
}

280
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
281 282
  if (pMnode->pSteps == NULL) return;

283
  if (pos == -1) {
284
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
285 286
  }

287
  for (int32_t s = pos; s >= 0; s--) {
288
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
289
    mDebug("%s will cleanup", pStep->name);
290 291 292
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
293 294
  }

S
Shengliang Guan 已提交
295
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
296
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
297
  pMnode->pSteps = NULL;
298
}
S
Shengliang Guan 已提交
299

300
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
301
  int32_t size = taosArrayGetSize(pMnode->pSteps);
302
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
303
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
304
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
305

S
Shengliang Guan 已提交
306
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
307
      int32_t code = terrno;
S
Shengliang Guan 已提交
308
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
309
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
310
      terrno = code;
S
Shengliang Guan 已提交
311
      return -1;
S
Shengliang Guan 已提交
312
    } else {
S
Shengliang Guan 已提交
313
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
314
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
315 316
    }
  }
S
Shengliang Guan 已提交
317

S
shm  
Shengliang Guan 已提交
318
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
319
  return 0;
320
}
S
Shengliang Guan 已提交
321

S
shm  
Shengliang Guan 已提交
322
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
323
  pMnode->msgCb = pOption->msgCb;
324
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
325
  pMnode->syncMgmt.replica = pOption->replica;
S
Shengliang Guan 已提交
326
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
327
}
328

329
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
330 331
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
332
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
333 334 335 336 337 338
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
339 340
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
341
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
342

343
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
344 345
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
346
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
347 348 349 350
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
351

S
Shengliang Guan 已提交
352
  int32_t code = mndCreateDir(pMnode, path);
353
  if (code != 0) {
S
Shengliang Guan 已提交
354 355
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
356 357 358 359 360
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

361
  code = mndInitSteps(pMnode);
362
  if (code != 0) {
S
Shengliang Guan 已提交
363 364
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
365 366 367 368 369 370 371
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
372 373
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
374 375 376 377
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
378

S
Shengliang Guan 已提交
379
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
380 381
  return pMnode;
}
S
Shengliang Guan 已提交
382

383 384
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
385
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
386
    syncLeaderTransfer(pMnode->syncMgmt.sync);
387

388 389 390 391 392 393 394 395 396
    /*
        mDebug("vgId:1, mnode start leader transfer");
        // wait for leader transfer finish
        while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
          taosMsleep(10);
          mDebug("vgId:1, mnode waiting for leader transfer");
        }
        mDebug("vgId:1, mnode finish leader transfer");
    */
397 398 399
  }
}

400
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
401 402 403
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
404 405
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
406 407
    mDebug("mnode is closed");
  }
408
}
S
Shengliang Guan 已提交
409

410
int32_t mndStart(SMnode *pMnode) {
411
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
412
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
413 414 415 416 417
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
418
  }
M
Minghao Li 已提交
419

C
Cary Xu 已提交
420
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
421

M
Minghao Li 已提交
422 423 424
  return mndInitTimer(pMnode);
}

425
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
426
  mndSetStop(pMnode);
427
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
428
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
429 430
}

M
Minghao Li 已提交
431
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
432
  SMnode    *pMnode = pMsg->info.node;
433
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
434
  int32_t    code = 0;
M
Minghao Li 已提交
435

436 437
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
438 439
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
440
  }
M
Minghao Li 已提交
441

442 443 444
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
445 446
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
447
  }
448

449
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
450
  if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
451
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
452 453 454
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
455
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
456 457 458
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
459
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
460 461 462
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
463
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
464
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
465
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
466
      syncClientRequestDestroy(pSyncMsg);
467
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
468 469 470
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
471
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
472 473 474
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
475
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
476 477 478
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
479
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
480 481 482
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
483
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
484 485 486
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
487
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
488 489 490
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
491
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
492 493 494
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
495 496
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
497
      code = -1;
M
Minghao Li 已提交
498
    }
M
Minghao Li 已提交
499
  } else {
500
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
501 502 503
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
504
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
505 506 507
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
508
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
509 510 511
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
512
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
513
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
514
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
M
Minghao Li 已提交
515
      syncClientRequestDestroy(pSyncMsg);
516
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
517 518 519
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
520
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
521 522 523
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
524
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
525 526 527
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
528
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
529 530 531
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
532
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
533 534 535
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
536 537
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
538
      code = -1;
M
Minghao Li 已提交
539
    }
M
Minghao Li 已提交
540 541
  }

M
Minghao Li 已提交
542 543
  syncNodeRelease(pSyncNode);

M
Minghao Li 已提交
544 545 546
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
547
  return code;
M
Minghao Li 已提交
548 549
}

S
Shengliang Guan 已提交
550
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
551
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
552 553
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
554
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
555 556
    return 0;
  }
S
Shengliang Guan 已提交
557
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
558
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
S
Shengliang Guan 已提交
559
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
S
Shengliang Guan 已提交
560 561
    return -1;
  }
S
Shengliang Guan 已提交
562

S
Shengliang Guan 已提交
563 564
  SEpSet epSet = {0};
  mndGetMnodeEpSet(pMsg->info.node, &epSet);
565

S
Shengliang Guan 已提交
566 567 568 569
  const STraceId *trace = &pMsg->info.traceId;
  mError("msg:%p, failed to check mnode state since %s, type:%s, numOfMnodes:%d inUse:%d", pMsg, terrstr(),
         TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse);

S
Shengliang Guan 已提交
570 571 572 573 574 575 576
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
      mInfo("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
577
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
578 579 580 581 582 583 584
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
585
  } else {
S
Shengliang Guan 已提交
586
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
587
  }
S
Shengliang Guan 已提交
588 589

  return -1;
590 591
}

S
Shengliang Guan 已提交
592
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
593 594
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
595

S
Shengliang Guan 已提交
596 597
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
598
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
599 600 601 602
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
603
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
604
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
605 606
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
607
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
608
  if (fp == NULL) {
S
Shengliang Guan 已提交
609
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
610 611
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
612 613
  }

S
Shengliang Guan 已提交
614 615 616
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
617
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
618
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
619 620
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
621
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
622
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
623
  } else if (code == 0) {
S
Shengliang Guan 已提交
624
    mGTrace("msg:%p, successfully processed", pMsg);
625
  } else {
S
Shengliang Guan 已提交
626 627
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
628
  }
S
Shengliang Guan 已提交
629

S
shm  
Shengliang Guan 已提交
630
  return code;
S
Shengliang Guan 已提交
631 632
}

633 634 635 636
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
637 638 639
  }
}

D
dapan1121 已提交
640
// Note: uid 0 is reserved
641
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
642
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
643
  do {
L
Liu Jicong 已提交
644
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
645 646
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
647
    if (uuid) {
L
Liu Jicong 已提交
648
      return llabs(uuid);
D
dapan1121 已提交
649 650
    }
  } while (true);
L
Liu Jicong 已提交
651
}
S
Shengliang Guan 已提交
652 653

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
654
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
655
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
656

M
Minghao Li 已提交
657
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
658 659 660 661 662
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
663 664 665
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
S
Shengliang Guan 已提交
666
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
667 668 669 670
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
671
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
672 673
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
674 675
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
676 677 678 679 680 681 682 683 684 685

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
686
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

705
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
706 707
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
708 709 710 711
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
712
    }
713 714
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
715 716 717 718 719 720 721 722 723 724
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
725
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
726 727 728

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
729 730 731 732 733

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
734 735 736 737
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
738
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
739 740
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
741
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
742 743 744 745
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
746
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
747 748 749 750 751 752 753 754 755
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
756 757 758 759 760 761 762
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
763
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
777 778 779 780 781 782 783 784
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
785
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
786
  return 0;
L
Liu Jicong 已提交
787
}
S
Shengliang Guan 已提交
788 789

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
790
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
791
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
792
  return 0;
L
fix  
Liu Jicong 已提交
793
}
S
Shengliang Guan 已提交
794 795 796 797 798 799 800 801 802 803 804

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
805
    // mTrace("mnode rpc is acquired, ref:%d", ref);
S
Shengliang Guan 已提交
806 807 808 809 810 811 812 813
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
814
  // mTrace("mnode rpc is released, ref:%d", ref);
S
Shengliang Guan 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }