mndMain.c 23.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
88 89
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
90
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91 92 93 94
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
95 96
}

97
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
98
  int32_t contLen = 0;
M
Minghao Li 已提交
99
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
100
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
101 102 103
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
104 105
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
106
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
107
  if (pReq != NULL) {
L
Liu Jicong 已提交
108
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
109 110
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
111
}
L
Liu Jicong 已提交
112

S
Shengliang Guan 已提交
113 114
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
115
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
116 117 118 119
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
120 121
}

122
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
123 124 125 126 127 128 129 130 131
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

132 133 134 135 136 137 138 139 140 141
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

S
Shengliang Guan 已提交
142
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
143
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
144 145 146 147 148 149
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
150
    if (mndGetStop(pMnode)) break;
151
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
152

153 154
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
155
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
156 157
    }

158
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
159 160 161
      mndPullupTrans(pMnode);
    }

162
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
163 164 165
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
166
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
167 168
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
169

170
    if (sec % tsGrantHBInterval == 0) {
171
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
172
    }
173

174
    if (sec % tsUptimeInterval == 0) {
175 176
      mndIncreaseUpTime(pMnode);
    }
S
Shengliang Guan 已提交
177 178
  }

S
Shengliang Guan 已提交
179
  return NULL;
L
Liu Jicong 已提交
180 181
}

182
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
183 184 185 186 187
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
188 189
    return -1;
  }
L
Liu Jicong 已提交
190

S
Shengliang Guan 已提交
191 192
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
193 194 195
  return 0;
}

196
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
197 198
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
199
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
200 201 202
  }
}

S
Shengliang Guan 已提交
203
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
204 205 206
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
207
    return -1;
208 209 210 211
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
212
    return -1;
213
  }
214 215

  return 0;
216
}
S
Shengliang Guan 已提交
217

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
233
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
234 235 236 237 238 239 240 241 242 243 244 245 246
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

247 248 249
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
250
  opt.pMnode = pMnode;
251
  opt.pWal = pMnode->pWal;
252
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254
  pMnode->pSdb = sdbInit(&opt);
255
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
256 257 258 259 260 261
    return -1;
  }

  return 0;
}

262 263 264 265 266 267 268
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
269 270 271

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
272
    sdbCleanup(pMnode->pSdb);
273 274 275 276
    pMnode->pSdb = NULL;
  }
}

277 278 279 280 281
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
282
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
283 284 285 286
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
287 288 289
  return 0;
}

290
static int32_t mndInitSteps(SMnode *pMnode) {
291
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
292 293 294 295
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
296
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
297
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
298
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
299
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
300
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
301
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
302
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
303
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
304
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
305 306
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
307
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
308
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
309
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
310
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
311
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
312
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
313
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
314
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
315 316
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
317
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
318 319
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
320 321 322 323

  return 0;
}

324
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
325 326
  if (pMnode->pSteps == NULL) return;

327
  if (pos == -1) {
328
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
329 330
  }

331
  for (int32_t s = pos; s >= 0; s--) {
332
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
333
    mInfo("%s will cleanup", pStep->name);
334 335 336
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
337 338
  }

S
Shengliang Guan 已提交
339
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
340
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
341
  pMnode->pSteps = NULL;
342
}
S
Shengliang Guan 已提交
343

344
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
345
  int32_t size = taosArrayGetSize(pMnode->pSteps);
346
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
347
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
348
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
349

S
Shengliang Guan 已提交
350
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
351
      int32_t code = terrno;
S
Shengliang Guan 已提交
352
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
353
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
354
      terrno = code;
S
Shengliang Guan 已提交
355
      return -1;
S
Shengliang Guan 已提交
356
    } else {
357
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
358
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
359 360
    }
  }
S
Shengliang Guan 已提交
361

S
shm  
Shengliang Guan 已提交
362
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
363
  return 0;
364
}
S
Shengliang Guan 已提交
365

S
shm  
Shengliang Guan 已提交
366
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
367
  pMnode->msgCb = pOption->msgCb;
368
  pMnode->selfDnodeId = pOption->dnodeId;
369 370 371
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
372
}
373

374
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
375
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
376

wafwerar's avatar
wafwerar 已提交
377
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
378 379 380 381 382
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
383
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
384

S
Shengliang Guan 已提交
385 386
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
387
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
388

389
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
390 391
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
392
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
393 394 395 396
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
397

S
Shengliang Guan 已提交
398
  int32_t code = mndCreateDir(pMnode, path);
399
  if (code != 0) {
S
Shengliang Guan 已提交
400 401
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
402 403 404 405 406
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

407
  code = mndInitSteps(pMnode);
408
  if (code != 0) {
S
Shengliang Guan 已提交
409 410
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
411 412 413 414 415 416 417
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
418 419
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
420 421 422 423
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
424

S
Shengliang Guan 已提交
425
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
426 427
  return pMnode;
}
S
Shengliang Guan 已提交
428

429 430 431
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
432
    syncPreStop(pMnode->syncMgmt.sync);
433 434 435
  }
}

436
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
437
  if (pMnode != NULL) {
438
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
439
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
440 441
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
442
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
443
  }
444
}
S
Shengliang Guan 已提交
445

446
int32_t mndStart(SMnode *pMnode) {
447
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
448
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
449 450 451 452
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
453
    mndSetRestored(pMnode, true);
454
  }
M
Minghao Li 已提交
455

C
Cary Xu 已提交
456
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
457

M
Minghao Li 已提交
458 459 460
  return mndInitTimer(pMnode);
}

461
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
462
  mndSetStop(pMnode);
463
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
464
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
465 466
}

M
Minghao Li 已提交
467
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
468
  SMnode    *pMnode = pMsg->info.node;
469
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
470

S
Shengliang Guan 已提交
471 472
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
473

S
Shengliang Guan 已提交
474
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
475
  if (code != 0) {
S
Shengliang Guan 已提交
476
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
477
  }
S
Shengliang Guan 已提交
478

479
  return code;
M
Minghao Li 已提交
480 481
}

S
Shengliang Guan 已提交
482
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
483
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
484 485
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
486
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
487 488
    return 0;
  }
489
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
490

491 492 493
  SMnode    *pMnode = pMsg->info.node;
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);

L
Liu Jicong 已提交
494
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
495 496
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
497
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
498
           pMnode->stopped, state.restored, syncStr(state.restored));
S
Shengliang Guan 已提交
499 500
    return -1;
  }
S
Shengliang Guan 已提交
501

502 503
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
504
  mndGetMnodeEpSet(pMnode, &epSet);
505

506
  mDebug(
507 508
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
509 510
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
      syncStr(state.restored), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
511

S
Shengliang Guan 已提交
512 513
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
514
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
515 516 517 518
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
519
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
520 521 522 523 524 525 526
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
527
  } else {
S
Shengliang Guan 已提交
528
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
529
  }
S
Shengliang Guan 已提交
530 531

  return -1;
532 533
}

S
Shengliang Guan 已提交
534
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
535 536
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
537

S
Shengliang Guan 已提交
538 539
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
540
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
541 542 543 544
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
545
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
546
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
547 548
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
549
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
550
  if (fp == NULL) {
S
Shengliang Guan 已提交
551
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
552 553
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
554 555
  }

S
Shengliang Guan 已提交
556 557 558
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
559
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
560
  int32_t code = (*fp)(pMsg);
561
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
562

S
Shengliang Guan 已提交
563
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
564
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
565
  } else if (code == 0) {
S
Shengliang Guan 已提交
566
    mGTrace("msg:%p, successfully processed", pMsg);
567
  } else {
S
Shengliang Guan 已提交
568 569
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
570
  }
S
Shengliang Guan 已提交
571

S
shm  
Shengliang Guan 已提交
572
  return code;
S
Shengliang Guan 已提交
573 574
}

575 576
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
577
  if (type < TDMT_MAX) {
578
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
579 580 581
  }
}

D
dapan1121 已提交
582
// Note: uid 0 is reserved
583
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
584
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
585
  do {
L
Liu Jicong 已提交
586
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
587 588
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
589
    if (uuid) {
L
Liu Jicong 已提交
590
      return llabs(uuid);
D
dapan1121 已提交
591 592
    }
  } while (true);
L
Liu Jicong 已提交
593
}
S
Shengliang Guan 已提交
594 595

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
596
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
597
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
598

M
Minghao Li 已提交
599
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
600 601 602 603 604
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
605 606 607
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
608
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
609 610 611 612
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
613
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
614 615
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
616 617
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
618 619 620 621 622 623 624 625 626 627

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
628
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

647
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
648 649
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
650 651
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
652 653
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
654
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
655
    }
656 657
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
658 659 660 661 662 663 664 665 666 667
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
668
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
669 670 671

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
672 673 674 675 676

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
677 678 679 680
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
681
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
682 683
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
684 685
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
686 687 688
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
689
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
690 691 692 693 694 695 696 697 698
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
699 700 701 702 703 704 705
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
706
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
707 708 709 710 711 712 713 714 715 716 717 718 719

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
720 721 722 723 724 725 726 727
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

728
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
729
  return 0;
L
Liu Jicong 已提交
730
}
S
Shengliang Guan 已提交
731 732

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
733 734 735
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
736
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
737
  return 0;
L
fix  
Liu Jicong 已提交
738
}
S
Shengliang Guan 已提交
739

740
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }