mndMain.c 24.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
88 89
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
90
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91 92 93 94
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
95 96
}

97
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
98
  int32_t contLen = 0;
M
Minghao Li 已提交
99
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
100
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
101 102 103
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
104 105
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
106
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
107 108 109 110
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
111
}
L
Liu Jicong 已提交
112

S
Shengliang Guan 已提交
113 114
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
115
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
116 117 118 119
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
120 121
}

122
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
123 124 125 126 127 128 129 130 131
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

132 133 134 135 136 137 138 139 140 141
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
142
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
143
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
144 145 146 147 148 149
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
150
    if (mndGetStop(pMnode)) break;
151
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
152

153 154
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
155
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
156 157
    }

158
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
159 160 161
      mndPullupTrans(pMnode);
    }

162
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
163 164 165
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
166
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
167 168
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
169

170
    if (sec % tsGrantHBInterval == 0) {
171
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
172
    }
173

174
    if (sec % tsUptimeInterval == 0) {
175 176
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
177 178
  }

S
Shengliang Guan 已提交
179
  return NULL;
L
Liu Jicong 已提交
180 181
}

182
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
183 184 185 186 187
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
188 189
    return -1;
  }
L
Liu Jicong 已提交
190

S
Shengliang Guan 已提交
191 192
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
193 194 195
  return 0;
}

196
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
197 198
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
199
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
200 201 202
  }
}

S
Shengliang Guan 已提交
203
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
204 205 206
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
207
    return -1;
208 209 210 211
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
212
    return -1;
213
  }
214 215

  return 0;
216
}
S
Shengliang Guan 已提交
217

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
233
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
234 235 236 237 238 239 240 241 242 243 244 245 246
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

247 248 249
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
250
  opt.pMnode = pMnode;
251
  opt.pWal = pMnode->pWal;
252
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254
  pMnode->pSdb = sdbInit(&opt);
255
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
256 257 258 259 260 261
    return -1;
  }

  return 0;
}

262 263 264 265 266 267 268
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
269 270 271

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
272
    sdbCleanup(pMnode->pSdb);
273 274 275 276
    pMnode->pSdb = NULL;
  }
}

277 278 279 280 281
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
282
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
283 284 285 286
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
287 288 289
  return 0;
}

290
static int32_t mndInitSteps(SMnode *pMnode) {
291
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
292 293 294 295
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
296
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
297
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
298
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
299
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
300
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
301
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
302
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
303
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
304
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
305 306
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
307
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
308
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
309
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
310
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
311
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
312
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
313
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
314
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
315 316
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
317
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
318 319
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
320 321 322 323

  return 0;
}

324
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
325 326
  if (pMnode->pSteps == NULL) return;

327
  if (pos == -1) {
328
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
329 330
  }

331
  for (int32_t s = pos; s >= 0; s--) {
332
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
333
    mInfo("%s will cleanup", pStep->name);
334 335 336
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
337 338
  }

S
Shengliang Guan 已提交
339
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
340
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
341
  pMnode->pSteps = NULL;
342
}
S
Shengliang Guan 已提交
343

344
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
345
  int32_t size = taosArrayGetSize(pMnode->pSteps);
346
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
347
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
348
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
349

S
Shengliang Guan 已提交
350
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
351
      int32_t code = terrno;
S
Shengliang Guan 已提交
352
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
353
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
354
      terrno = code;
S
Shengliang Guan 已提交
355
      return -1;
S
Shengliang Guan 已提交
356
    } else {
357
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
358
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
359 360
    }
  }
S
Shengliang Guan 已提交
361

S
shm  
Shengliang Guan 已提交
362
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
363
  return 0;
364
}
S
Shengliang Guan 已提交
365

S
shm  
Shengliang Guan 已提交
366
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
367
  pMnode->msgCb = pOption->msgCb;
368
  pMnode->selfDnodeId = pOption->dnodeId;
369 370 371
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
372
}
373

374
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
375
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
376

wafwerar's avatar
wafwerar 已提交
377
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
378 379 380 381 382
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
383
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
384

S
Shengliang Guan 已提交
385 386
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
387
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
388

389
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
390 391
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
392
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
393 394 395 396
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
397

S
Shengliang Guan 已提交
398
  int32_t code = mndCreateDir(pMnode, path);
399
  if (code != 0) {
S
Shengliang Guan 已提交
400 401
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
402 403 404 405 406
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

407
  code = mndInitSteps(pMnode);
408
  if (code != 0) {
S
Shengliang Guan 已提交
409 410
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
411 412 413 414 415 416 417
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
418 419
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
420 421 422 423
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
424

S
Shengliang Guan 已提交
425
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
426 427
  return pMnode;
}
S
Shengliang Guan 已提交
428

429 430
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
431
    atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
432
    syncLeaderTransfer(pMnode->syncMgmt.sync);
433

434 435 436 437 438 439 440 441 442
#if 0
    mInfo("vgId:1, mnode start leader transfer");
    // wait for leader transfer finish
    while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
      taosMsleep(10);
      mInfo("vgId:1, mnode waiting for leader transfer");
    }
    mInfo("vgId:1, mnode finish leader transfer");
#endif
443 444 445
  }
}

446
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
447
  if (pMnode != NULL) {
448
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
449
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
450 451
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
452
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
453
  }
454
}
S
Shengliang Guan 已提交
455

456
int32_t mndStart(SMnode *pMnode) {
457
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
458
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
459 460 461 462
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
463
    mndSetRestored(pMnode, true);
464
  }
M
Minghao Li 已提交
465

C
Cary Xu 已提交
466
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
467

M
Minghao Li 已提交
468 469 470
  return mndInitTimer(pMnode);
}

471
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
472
  mndSetStop(pMnode);
473
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
474
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
475 476
}

M
Minghao Li 已提交
477
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
478
  SMnode    *pMnode = pMsg->info.node;
479
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
480

S
Shengliang Guan 已提交
481 482
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
483

S
Shengliang Guan 已提交
484
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
485
  if (code != 0) {
S
Shengliang Guan 已提交
486
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
487
  }
S
Shengliang Guan 已提交
488

489
  return code;
M
Minghao Li 已提交
490 491
}

S
Shengliang Guan 已提交
492
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
493
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
494 495
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
496
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
497 498
    return 0;
  }
499
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
500 501 502 503

  SMnode     *pMnode = pMsg->info.node;
  const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync);
  bool        restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
504
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
505 506
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
507 508
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
           pMnode->stopped, restored, role);
S
Shengliang Guan 已提交
509 510
    return -1;
  }
S
Shengliang Guan 已提交
511

512 513
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
514
  mndGetMnodeEpSet(pMnode, &epSet);
515

516
  mDebug(
517 518 519 520
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps,
      epSet.inUse);
S
Shengliang Guan 已提交
521

S
Shengliang Guan 已提交
522 523
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
524
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
525 526 527 528
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
529
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
530 531 532 533 534 535 536
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
537
  } else {
S
Shengliang Guan 已提交
538
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
539
  }
S
Shengliang Guan 已提交
540 541

  return -1;
542 543
}

S
Shengliang Guan 已提交
544
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
545 546
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
547

S
Shengliang Guan 已提交
548 549
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
550
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
551 552 553 554
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
555
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
556
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
557 558
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
559
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
560
  if (fp == NULL) {
S
Shengliang Guan 已提交
561
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
562 563
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
564 565
  }

S
Shengliang Guan 已提交
566 567 568
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
569
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
570
  int32_t code = (*fp)(pMsg);
571
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
572

S
Shengliang Guan 已提交
573
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
574
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
575
  } else if (code == 0) {
S
Shengliang Guan 已提交
576
    mGTrace("msg:%p, successfully processed", pMsg);
577
  } else {
S
Shengliang Guan 已提交
578 579
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
580
  }
S
Shengliang Guan 已提交
581

S
shm  
Shengliang Guan 已提交
582
  return code;
S
Shengliang Guan 已提交
583 584
}

585 586
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
587
  if (type < TDMT_MAX) {
588
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
589 590 591
  }
}

D
dapan1121 已提交
592
// Note: uid 0 is reserved
593
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
594
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
595
  do {
L
Liu Jicong 已提交
596
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
597 598
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
599
    if (uuid) {
L
Liu Jicong 已提交
600
      return llabs(uuid);
D
dapan1121 已提交
601 602
    }
  } while (true);
L
Liu Jicong 已提交
603
}
S
Shengliang Guan 已提交
604 605

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
606
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
607
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
608

M
Minghao Li 已提交
609
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
610 611 612 613 614
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
615 616 617
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
618
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
619 620 621 622
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
623
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
624 625
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
626 627
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
628 629 630 631 632 633 634 635 636 637

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
638
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

657
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
658 659
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
660 661
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
662 663
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
664
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
665
    }
666 667
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
668 669 670 671 672 673 674 675 676 677
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
678
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
679 680 681

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
682 683 684 685 686

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
687 688 689 690
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
691
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
692 693
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
694 695
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
696 697 698
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
699
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
700 701 702 703 704 705 706 707 708
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
709 710 711 712 713 714 715
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
716
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
730 731 732 733 734 735 736 737
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

738
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
739
  return 0;
L
Liu Jicong 已提交
740
}
S
Shengliang Guan 已提交
741 742

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
743
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
744 745
  pLoad->syncRestore = pMnode->restored;
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
746
  return 0;
L
fix  
Liu Jicong 已提交
747
}
S
Shengliang Guan 已提交
748

749
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }