mndMain.c 25.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
88 89
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
90
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91 92 93 94
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
95 96
}

97
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
98
  int32_t contLen = 0;
M
Minghao Li 已提交
99
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
100
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
101 102 103
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
104 105
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
106
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
107
  if (pReq != NULL) {
L
Liu Jicong 已提交
108
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
109 110
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
111
}
L
Liu Jicong 已提交
112

S
Shengliang Guan 已提交
113 114
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
115
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
116 117 118 119
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
120 121
}

122
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
123 124 125 126 127 128 129 130 131
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

132 133 134 135 136 137 138 139 140 141
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
  SSdb *pSdb = pMnode->pSdb;

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    bool roleChanged = false;
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
      if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) {
        if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) {
          mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0",
                pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore);
          pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR;
          pVgroup->vnodeGid[vg].syncRestore = 0;
          roleChanged = true;
        }
        break;
      }
    }

    if (roleChanged) {
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
167
      if (pDb != NULL && pDb->stateTs != curMs) {
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
              curMs);
        pDb->stateTs = curMs;
      }
      mndReleaseDb(pMnode, pDb);
    }

    sdbRelease(pSdb, pVgroup);
  }
}

static void mndCheckDnodeOffline(SMnode *pMnode) {
  SSdb   *pSdb = pMnode->pSdb;
  int64_t curMs = taosGetTimestampMs();

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pDnode = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;

    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (!online) {
      mInfo("dnode:%d, in offline state", pDnode->id);
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
    }

    sdbRelease(pSdb, pDnode);
  }
}

S
Shengliang Guan 已提交
199
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
200
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
201 202 203 204 205 206
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
207
    if (mndGetStop(pMnode)) break;
208
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
209

210 211
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
212
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
213 214
    }

215
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
216 217 218
      mndPullupTrans(pMnode);
    }

219
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
220 221 222
      mndCalMqRebalance(pMnode);
    }

S
Shengliang Guan 已提交
223
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
224 225
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
226

227
    if (sec % tsGrantHBInterval == 0) {
228
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
229
    }
230

231
    if (sec % tsUptimeInterval == 0) {
232 233
      mndIncreaseUpTime(pMnode);
    }
234 235 236 237

    if (sec % (tsStatusInterval * 5) == 0) {
      mndCheckDnodeOffline(pMnode);
    }
S
Shengliang Guan 已提交
238 239
  }

S
Shengliang Guan 已提交
240
  return NULL;
L
Liu Jicong 已提交
241 242
}

243
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
244 245 246 247 248
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
249 250
    return -1;
  }
L
Liu Jicong 已提交
251

S
Shengliang Guan 已提交
252 253
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
254 255 256
  return 0;
}

257
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
258 259
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
260
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
261 262 263
  }
}

S
Shengliang Guan 已提交
264
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
265 266 267
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
268
    return -1;
269 270 271 272
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
273
    return -1;
274
  }
275 276

  return 0;
277
}
S
Shengliang Guan 已提交
278

279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
294
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
295 296 297 298 299 300 301 302 303 304 305 306 307
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

308 309 310
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
311
  opt.pMnode = pMnode;
312
  opt.pWal = pMnode->pWal;
313
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315
  pMnode->pSdb = sdbInit(&opt);
316
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
317 318 319 320 321 322
    return -1;
  }

  return 0;
}

323 324 325 326 327 328 329
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
330 331 332

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
333
    sdbCleanup(pMnode->pSdb);
334 335 336 337
    pMnode->pSdb = NULL;
  }
}

338 339 340 341 342
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
343
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
344 345 346 347
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
348 349 350
  return 0;
}

351
static int32_t mndInitSteps(SMnode *pMnode) {
352
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
353 354 355 356
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
357
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
358
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
359
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
360
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
361
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
362
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
363
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
364
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
365
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
366 367
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
368
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
369
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
370
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
371
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
372
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
373
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
374
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
375
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
376 377
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
378
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
379 380
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
381 382 383 384

  return 0;
}

385
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
386 387
  if (pMnode->pSteps == NULL) return;

388
  if (pos == -1) {
389
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
390 391
  }

392
  for (int32_t s = pos; s >= 0; s--) {
393
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
394
    mInfo("%s will cleanup", pStep->name);
395 396 397
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
398 399
  }

S
Shengliang Guan 已提交
400
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
401
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
402
  pMnode->pSteps = NULL;
403
}
S
Shengliang Guan 已提交
404

405
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
406
  int32_t size = taosArrayGetSize(pMnode->pSteps);
407
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
408
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
409
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
410

S
Shengliang Guan 已提交
411
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
412
      int32_t code = terrno;
S
Shengliang Guan 已提交
413
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
414
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
415
      terrno = code;
S
Shengliang Guan 已提交
416
      return -1;
S
Shengliang Guan 已提交
417
    } else {
418
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
419
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
420 421
    }
  }
S
Shengliang Guan 已提交
422

S
shm  
Shengliang Guan 已提交
423
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
424
  return 0;
425
}
S
Shengliang Guan 已提交
426

S
shm  
Shengliang Guan 已提交
427
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
428
  pMnode->msgCb = pOption->msgCb;
429
  pMnode->selfDnodeId = pOption->dnodeId;
430 431 432
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
433
}
434

435
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
436
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
437

wafwerar's avatar
wafwerar 已提交
438
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
439 440 441 442 443
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
444
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
445

S
Shengliang Guan 已提交
446 447
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
448
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
449

450
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
451 452
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
453
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
454 455 456 457
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
458

S
Shengliang Guan 已提交
459
  int32_t code = mndCreateDir(pMnode, path);
460
  if (code != 0) {
S
Shengliang Guan 已提交
461 462
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
463 464 465 466 467
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

468
  code = mndInitSteps(pMnode);
469
  if (code != 0) {
S
Shengliang Guan 已提交
470 471
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
472 473 474 475 476 477 478
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
479 480
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
481 482 483 484
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
485

S
Shengliang Guan 已提交
486
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
487 488
  return pMnode;
}
S
Shengliang Guan 已提交
489

490 491 492
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
493
    syncPreStop(pMnode->syncMgmt.sync);
494

495
    while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
496 497 498
      mInfo("vgId:1, snapshot is recving");
      taosMsleep(300);
    }
499
    while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
500 501 502
      mInfo("vgId:1, snapshot is sending");
      taosMsleep(300);
    }
503 504 505
  }
}

506
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
507
  if (pMnode != NULL) {
508
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
509
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
510 511
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
512
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
513
  }
514
}
S
Shengliang Guan 已提交
515

516
int32_t mndStart(SMnode *pMnode) {
517
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
518
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
519 520 521 522
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
523
    mndSetRestored(pMnode, true);
524
  }
M
Minghao Li 已提交
525

C
Cary Xu 已提交
526
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
527

M
Minghao Li 已提交
528 529 530
  return mndInitTimer(pMnode);
}

531
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
532
  mndSetStop(pMnode);
533
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
534
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
535 536
}

M
Minghao Li 已提交
537
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
538
  SMnode    *pMnode = pMsg->info.node;
539
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
540

S
Shengliang Guan 已提交
541 542
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
543

S
Shengliang Guan 已提交
544
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
545
  if (code != 0) {
S
Shengliang Guan 已提交
546
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
547
  }
S
Shengliang Guan 已提交
548

549
  return code;
M
Minghao Li 已提交
550 551
}

S
Shengliang Guan 已提交
552
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
553
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
554 555
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
556
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
557 558
    return 0;
  }
559
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
560

561 562 563
  SMnode    *pMnode = pMsg->info.node;
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);

L
Liu Jicong 已提交
564
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
565 566
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
567
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
568
           pMnode->stopped, state.restored, syncStr(state.restored));
S
Shengliang Guan 已提交
569 570
    return -1;
  }
S
Shengliang Guan 已提交
571

572 573
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
574
  mndGetMnodeEpSet(pMnode, &epSet);
575

576
  mDebug(
577 578
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
579 580
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
      syncStr(state.restored), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
581

S
Shengliang Guan 已提交
582 583
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
584
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
585 586 587 588 589 590
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
591
      pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
592 593 594 595 596
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
597
  } else {
S
Shengliang Guan 已提交
598
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
599
  }
S
Shengliang Guan 已提交
600 601

  return -1;
602 603
}

S
Shengliang Guan 已提交
604
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
605 606
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
S
Shengliang Guan 已提交
607

S
Shengliang Guan 已提交
608 609
  const STraceId *trace = &pMsg->info.traceId;
  mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
S
Shengliang Guan 已提交
610
          pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
611 612 613 614
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
615
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
616
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
617 618
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
619
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
620
  if (fp == NULL) {
S
Shengliang Guan 已提交
621
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
622 623
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
624 625
  }

S
Shengliang Guan 已提交
626 627 628
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
629
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
630
  int32_t code = (*fp)(pMsg);
631
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
632

S
Shengliang Guan 已提交
633
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
634
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
635
  } else if (code == 0) {
S
Shengliang Guan 已提交
636
    mGTrace("msg:%p, successfully processed", pMsg);
637
  } else {
S
Shengliang Guan 已提交
638 639
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
640
  }
S
Shengliang Guan 已提交
641

S
shm  
Shengliang Guan 已提交
642
  return code;
S
Shengliang Guan 已提交
643 644
}

645 646
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
647
  if (type < TDMT_MAX) {
648
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
649 650 651
  }
}

D
dapan1121 已提交
652
// Note: uid 0 is reserved
653
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
654
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
655
  do {
L
Liu Jicong 已提交
656
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
657 658
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
659
    if (uuid) {
L
Liu Jicong 已提交
660
      return llabs(uuid);
D
dapan1121 已提交
661 662
    }
  } while (true);
L
Liu Jicong 已提交
663
}
S
Shengliang Guan 已提交
664 665

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
666
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
667
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
668

M
Minghao Li 已提交
669
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
670 671 672 673 674
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
675 676 677
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
678
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
679 680 681 682
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
683
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
684 685
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
686 687
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
688 689 690 691 692 693 694 695 696 697

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
698
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

717
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
718 719
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
720 721
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
722 723
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
724
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
725
    }
726 727
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
728 729 730 731 732 733 734 735 736 737
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
738
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
739 740 741

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
742 743 744 745 746

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
747 748 749 750
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
751
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
752 753
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
754 755
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
756 757 758
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
759
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
760 761 762 763 764 765 766 767 768
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
769 770 771 772 773 774 775
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
776
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
790 791 792 793 794 795 796 797
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

798
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
799
  return 0;
L
Liu Jicong 已提交
800
}
S
Shengliang Guan 已提交
801 802

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
803 804 805
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
806
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
807
  return 0;
L
fix  
Liu Jicong 已提交
808
}
S
Shengliang Guan 已提交
809

810
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
811 812 813 814
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
815
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
816 817 818 819
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
820
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
821 822 823 824 825 826 827 828 829 830 831 832 833
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
834
  mInfo("mnode set stopped");
S
Shengliang Guan 已提交
835 836 837
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }