mndMain.c 25.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
88 89
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
90
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91 92 93 94
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
95 96
}

97
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
98
  int32_t contLen = 0;
M
Minghao Li 已提交
99
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
100
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
101 102 103
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
104 105
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
106
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
107
  if (pReq != NULL) {
L
Liu Jicong 已提交
108
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
109 110
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
111
}
L
Liu Jicong 已提交
112

S
Shengliang Guan 已提交
113 114
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
115
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
116 117 118 119
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
120 121
}

122
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
123 124 125 126 127 128 129 130 131
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

132 133 134 135 136 137 138 139 140 141
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
  SSdb *pSdb = pMnode->pSdb;

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    bool roleChanged = false;
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
      if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) {
        if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) {
          mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0",
                pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore);
          pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR;
          pVgroup->vnodeGid[vg].syncRestore = 0;
          roleChanged = true;
        }
        break;
      }
    }

    if (roleChanged) {
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
167
      if (pDb != NULL && pDb->stateTs != curMs) {
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
              curMs);
        pDb->stateTs = curMs;
      }
      mndReleaseDb(pMnode, pDb);
    }

    sdbRelease(pSdb, pVgroup);
  }
}

static void mndCheckDnodeOffline(SMnode *pMnode) {
  SSdb   *pSdb = pMnode->pSdb;
  int64_t curMs = taosGetTimestampMs();

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pDnode = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;

    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (!online) {
      mInfo("dnode:%d, in offline state", pDnode->id);
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
    }

    sdbRelease(pSdb, pDnode);
  }
}

S
Shengliang Guan 已提交
199
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
200
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
201 202 203 204 205 206
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
207
    if (mndGetStop(pMnode)) break;
208
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
209

210 211
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
212
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
213 214
    }

215
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
216 217 218
      mndPullupTrans(pMnode);
    }

219
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
220 221 222
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
223
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
224 225
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
226

227
    if (sec % tsGrantHBInterval == 0) {
228
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
229
    }
230

231
    if (sec % tsUptimeInterval == 0) {
232 233
      mndIncreaseUpTime(pMnode);
    }
234 235 236 237

    if (sec % (tsStatusInterval * 5) == 0) {
      mndCheckDnodeOffline(pMnode);
    }
S
Shengliang Guan 已提交
238 239
  }

S
Shengliang Guan 已提交
240
  return NULL;
L
Liu Jicong 已提交
241 242
}

243
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
244 245 246 247 248
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
249 250
    return -1;
  }
L
Liu Jicong 已提交
251

S
Shengliang Guan 已提交
252 253
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
254 255 256
  return 0;
}

257
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
258 259
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
260
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
261 262 263
  }
}

S
Shengliang Guan 已提交
264
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
265 266 267
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
268
    return -1;
269 270 271 272
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
273
    return -1;
274
  }
275 276

  return 0;
277
}
S
Shengliang Guan 已提交
278

279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
294
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
295 296 297 298 299 300 301 302 303 304 305 306 307
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

308 309 310
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
311
  opt.pMnode = pMnode;
312
  opt.pWal = pMnode->pWal;
313
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315
  pMnode->pSdb = sdbInit(&opt);
316
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
317 318 319 320 321 322
    return -1;
  }

  return 0;
}

323 324 325 326 327 328 329
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
330 331 332

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
333
    sdbCleanup(pMnode->pSdb);
334 335 336 337
    pMnode->pSdb = NULL;
  }
}

338 339 340 341 342
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
343
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
344 345 346 347
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
348 349 350
  return 0;
}

351
static int32_t mndInitSteps(SMnode *pMnode) {
352
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
353 354 355 356
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
357
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
358
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
359
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
360
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
361
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
362
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
363
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
364
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
365
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
366 367
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
368
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
369
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
370
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
371
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
372
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
373
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
374
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
375
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
376 377
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
378
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
379 380
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
381 382 383 384

  return 0;
}

385
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
386 387
  if (pMnode->pSteps == NULL) return;

388
  if (pos == -1) {
389
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
390 391
  }

392
  for (int32_t s = pos; s >= 0; s--) {
393
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
394
    mInfo("%s will cleanup", pStep->name);
395 396 397
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
398 399
  }

S
Shengliang Guan 已提交
400
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
401
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
402
  pMnode->pSteps = NULL;
403
}
S
Shengliang Guan 已提交
404

405
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
406
  int32_t size = taosArrayGetSize(pMnode->pSteps);
407
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
408
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
409
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
410

S
Shengliang Guan 已提交
411
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
412
      int32_t code = terrno;
S
Shengliang Guan 已提交
413
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
414
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
415
      terrno = code;
S
Shengliang Guan 已提交
416
      return -1;
S
Shengliang Guan 已提交
417
    } else {
418
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
419
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
420 421
    }
  }
S
Shengliang Guan 已提交
422

S
shm  
Shengliang Guan 已提交
423
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
424
  return 0;
425
}
S
Shengliang Guan 已提交
426

S
shm  
Shengliang Guan 已提交
427
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
428
  pMnode->msgCb = pOption->msgCb;
429
  pMnode->selfDnodeId = pOption->dnodeId;
430 431 432
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
433
}
434

435
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
436
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
437

wafwerar's avatar
wafwerar 已提交
438
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
439 440 441 442 443
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
444
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
445

S
Shengliang Guan 已提交
446 447
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
448
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
449

450
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
451 452
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
453
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
454 455 456 457
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
458

S
Shengliang Guan 已提交
459
  int32_t code = mndCreateDir(pMnode, path);
460
  if (code != 0) {
S
Shengliang Guan 已提交
461 462
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
463 464 465 466 467
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

468
  code = mndInitSteps(pMnode);
469
  if (code != 0) {
S
Shengliang Guan 已提交
470 471
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
472 473 474 475 476 477 478
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
479 480
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
481 482 483 484
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
485

S
Shengliang Guan 已提交
486
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
487 488
  return pMnode;
}
S
Shengliang Guan 已提交
489

490 491 492
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
493
    syncPreStop(pMnode->syncMgmt.sync);
494 495 496
  }
}

497
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
498
  if (pMnode != NULL) {
499
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
500
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
501 502
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
503
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
504
  }
505
}
S
Shengliang Guan 已提交
506

507
int32_t mndStart(SMnode *pMnode) {
508
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
509
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
510 511 512 513
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
514
    mndSetRestored(pMnode, true);
515
  }
M
Minghao Li 已提交
516

C
Cary Xu 已提交
517
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
518

M
Minghao Li 已提交
519 520 521
  return mndInitTimer(pMnode);
}

522
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
523
  mndSetStop(pMnode);
524
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
525
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
526 527
}

M
Minghao Li 已提交
528
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
529
  SMnode    *pMnode = pMsg->info.node;
530
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
531

S
Shengliang Guan 已提交
532 533
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
534

S
Shengliang Guan 已提交
535
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
536
  if (code != 0) {
S
Shengliang Guan 已提交
537
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
538
  }
S
Shengliang Guan 已提交
539

540
  return code;
M
Minghao Li 已提交
541 542
}

S
Shengliang Guan 已提交
543
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
544
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
545 546
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
547
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
548 549
    return 0;
  }
550
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
551

552 553 554
  SMnode    *pMnode = pMsg->info.node;
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);

L
Liu Jicong 已提交
555
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
556 557
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
558
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
559
           pMnode->stopped, state.restored, syncStr(state.restored));
S
Shengliang Guan 已提交
560 561
    return -1;
  }
S
Shengliang Guan 已提交
562

563 564
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
565
  mndGetMnodeEpSet(pMnode, &epSet);
566

567
  mDebug(
568 569
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
570 571
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
      syncStr(state.restored), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
572

S
Shengliang Guan 已提交
573 574
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
575
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
576 577 578 579
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
dengyihao's avatar
dengyihao 已提交
580
    pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
581 582 583 584 585 586 587
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
588
  } else {
S
Shengliang Guan 已提交
589
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
590
  }
S
Shengliang Guan 已提交
591 592

  return -1;
593 594
}

S
Shengliang Guan 已提交
595
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
596 597
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
598

S
Shengliang Guan 已提交
599 600
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
601
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
602 603 604 605
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
606
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
607
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
608 609
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
610
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
611
  if (fp == NULL) {
S
Shengliang Guan 已提交
612
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
613 614
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
615 616
  }

S
Shengliang Guan 已提交
617 618 619
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
620
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
621
  int32_t code = (*fp)(pMsg);
622
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
623

S
Shengliang Guan 已提交
624
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
625
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
626
  } else if (code == 0) {
S
Shengliang Guan 已提交
627
    mGTrace("msg:%p, successfully processed", pMsg);
628
  } else {
S
Shengliang Guan 已提交
629 630
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
631
  }
S
Shengliang Guan 已提交
632

S
shm  
Shengliang Guan 已提交
633
  return code;
S
Shengliang Guan 已提交
634 635
}

636 637
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
638
  if (type < TDMT_MAX) {
639
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
640 641 642
  }
}

D
dapan1121 已提交
643
// Note: uid 0 is reserved
644
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
645
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
646
  do {
L
Liu Jicong 已提交
647
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
648 649
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
650
    if (uuid) {
L
Liu Jicong 已提交
651
      return llabs(uuid);
D
dapan1121 已提交
652 653
    }
  } while (true);
L
Liu Jicong 已提交
654
}
S
Shengliang Guan 已提交
655 656

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
657
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
658
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
659

M
Minghao Li 已提交
660
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
661 662 663 664 665
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
666 667 668
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
669
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
670 671 672 673
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
674
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
675 676
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
677 678
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
679 680 681 682 683 684 685 686 687 688

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
689
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

708
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
709 710
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
711 712
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
713 714
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
715
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
716
    }
717 718
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
719 720 721 722 723 724 725 726 727 728
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
729
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
730 731 732

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
733 734 735 736 737

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
738 739 740 741
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
742
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
743 744
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
745 746
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
747 748 749
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
750
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
751 752 753 754 755 756 757 758 759
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
760 761 762 763 764 765 766
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
767
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
781 782 783 784 785 786 787 788
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

789
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
790
  return 0;
L
Liu Jicong 已提交
791
}
S
Shengliang Guan 已提交
792 793

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
794 795 796
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
797
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
798
  return 0;
L
fix  
Liu Jicong 已提交
799
}
S
Shengliang Guan 已提交
800

801
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }