mndMain.c 26.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndCluster.h"
L
Liu Jicong 已提交
19
#include "mndConsumer.h"
S
Shengliang Guan 已提交
20 21 22
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
23
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
24
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
25
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndPerfSchema.h"
M
Minghao Li 已提交
27
#include "mndPrivilege.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
L
Liu Jicong 已提交
30
#include "mndQuery.h"
S
Shengliang Guan 已提交
31
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
32
#include "mndSma.h"
S
Shengliang Guan 已提交
33
#include "mndSnode.h"
S
Shengliang Guan 已提交
34
#include "mndStb.h"
L
Liu Jicong 已提交
35
#include "mndStream.h"
L
Liu Jicong 已提交
36
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
37 38
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
39
#include "mndTopic.h"
S
Shengliang Guan 已提交
40 41 42
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
43

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
static inline int32_t mndAcquireRpc(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsLeader(pMnode)) {
    code = -1;
  } else {
#if 1
    atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

static inline void mndReleaseRpc(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
#if 1
  atomic_sub_fetch_32(&pMnode->rpcRef, 1);
#else
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
#endif
  taosThreadRwlockUnlock(&pMnode->lock);
}

S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
  SMStreamTickReq timerReq = {
      .tick = sec,
  };

  int32_t contLen = tSerializeSMStreamTickMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMStreamTickMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
103 104
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
105
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
106 107 108 109
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
110 111
}

112
static void mndPullupTtl(SMnode *pMnode) {
wmmhello's avatar
wmmhello 已提交
113
  int32_t contLen = 0;
M
Minghao Li 已提交
114
  void   *pReq = mndBuildTimerMsg(&contLen);
wmmhello's avatar
wmmhello 已提交
115
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
wmmhello's avatar
wmmhello 已提交
116 117 118
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
119 120
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
121
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
122
  if (pReq != NULL) {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_TMQ_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
  int32_t contLen = 0;
  void   *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_STREAM_CHECKPOINT_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
S
Shengliang Guan 已提交
141 142
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
143
}
L
Liu Jicong 已提交
144

S
Shengliang Guan 已提交
145 146
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
M
Minghao Li 已提交
147
  void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
148 149 150 151
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
S
Shengliang Guan 已提交
152 153
}

154
static void mndPullupGrant(SMnode *pMnode) {
C
Cary Xu 已提交
155 156 157 158 159 160 161 162 163
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
    tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
  }
}

164 165 166 167 168 169 170 171 172 173
static void mndIncreaseUpTime(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  if (pReq != NULL) {
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_UPTIME_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9528};
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
  }
}

174 175 176 177 178 179 180 181 182 183 184
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
  SSdb *pSdb = pMnode->pSdb;

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    bool roleChanged = false;
    for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
185 186 187 188 189 190 191 192 193 194
      SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
      if (pGid->dnodeId == dnodeId) {
        if (pGid->syncState != TAOS_SYNC_STATE_OFFLINE) {
          mInfo(
              "vgId:%d, state changed by offline check, old state:%s restored:%d canRead:%d new state:error restored:0 "
              "canRead:0",
              pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead);
          pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
          pGid->syncRestore = 0;
          pGid->syncCanRead = 0;
195 196 197 198 199 200 201 202
          roleChanged = true;
        }
        break;
      }
    }

    if (roleChanged) {
      SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
203
      if (pDb != NULL && pDb->stateTs != curMs) {
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
        mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
              curMs);
        pDb->stateTs = curMs;
      }
      mndReleaseDb(pMnode, pDb);
    }

    sdbRelease(pSdb, pVgroup);
  }
}

static void mndCheckDnodeOffline(SMnode *pMnode) {
  SSdb   *pSdb = pMnode->pSdb;
  int64_t curMs = taosGetTimestampMs();

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pDnode = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
    if (pIter == NULL) break;

    bool online = mndIsDnodeOnline(pDnode, curMs);
    if (!online) {
      mInfo("dnode:%d, in offline state", pDnode->id);
      mndSetVgroupOffline(pMnode, pDnode->id, curMs);
    }

    sdbRelease(pSdb, pDnode);
  }
}

S
Shengliang Guan 已提交
235
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
236
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
237 238 239 240 241 242
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
243
    if (mndGetStop(pMnode)) break;
244
    if (lastTime % 10 != 0) continue;
S
Shengliang Guan 已提交
245

246 247
    int64_t sec = lastTime / 10;
    if (sec % tsTtlPushInterval == 0) {
248
      mndPullupTtl(pMnode);
wmmhello's avatar
wmmhello 已提交
249 250
    }

251
    if (sec % tsTransPullupInterval == 0) {
S
Shengliang Guan 已提交
252 253 254
      mndPullupTrans(pMnode);
    }

255
    if (sec % tsMqRebalanceInterval == 0) {
S
Shengliang Guan 已提交
256 257 258
      mndCalMqRebalance(pMnode);
    }

259 260 261 262 263 264
#if 0
    if (sec % tsStreamCheckpointTickInterval == 0) {
      mndStreamCheckpointTick(pMnode, sec);
    }
#endif

S
Shengliang Guan 已提交
265
    if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
S
Shengliang Guan 已提交
266 267
      mndPullupTelem(pMnode);
    }
C
Cary Xu 已提交
268

269
    if (sec % tsGrantHBInterval == 0) {
270
      mndPullupGrant(pMnode);
C
Cary Xu 已提交
271
    }
272

273
    if (sec % tsUptimeInterval == 0) {
274 275
      mndIncreaseUpTime(pMnode);
    }
276 277 278 279

    if (sec % (tsStatusInterval * 5) == 0) {
      mndCheckDnodeOffline(pMnode);
    }
S
Shengliang Guan 已提交
280 281
  }

S
Shengliang Guan 已提交
282
  return NULL;
L
Liu Jicong 已提交
283 284
}

285
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
286 287 288 289 290
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
291 292
    return -1;
  }
L
Liu Jicong 已提交
293

S
Shengliang Guan 已提交
294 295
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
296 297 298
  return 0;
}

299
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
300 301
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
302
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
303 304 305
  }
}

S
Shengliang Guan 已提交
306
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
307 308 309
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
310
    return -1;
311 312 313 314
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
315
    return -1;
316
  }
317 318

  return 0;
319
}
S
Shengliang Guan 已提交
320

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
336
    mError("failed to open wal since %s. wal:%s", terrstr(), path);
337 338 339 340 341 342 343 344 345 346 347 348 349
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

350 351 352
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
353
  opt.pMnode = pMnode;
354
  opt.pWal = pMnode->pWal;
355
  opt.sync = pMnode->syncMgmt.sync;
S
Shengliang Guan 已提交
356

S
Shengliang Guan 已提交
357
  pMnode->pSdb = sdbInit(&opt);
358
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
359 360 361 362 363 364
    return -1;
  }

  return 0;
}

365 366 367 368 369 370 371
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
372 373 374

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
375
    sdbCleanup(pMnode->pSdb);
376 377 378 379
    pMnode->pSdb = NULL;
  }
}

380 381 382 383 384
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
385
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
386 387 388 389
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
390 391 392
  return 0;
}

393
static int32_t mndInitSteps(SMnode *pMnode) {
394
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
395 396 397 398
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
399
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
400
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
S
Shengliang Guan 已提交
401
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
402
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
403
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
404
  if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
S
Shengliang Guan 已提交
405
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
406
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
407
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
408 409
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
S
Shengliang Guan 已提交
410
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
411
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
412
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
413
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
414
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
415
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
416
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
417
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
418 419
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
420
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
421 422
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
423 424 425 426

  return 0;
}

427
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
428 429
  if (pMnode->pSteps == NULL) return;

430
  if (pos == -1) {
431
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
432 433
  }

434
  for (int32_t s = pos; s >= 0; s--) {
435
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
436
    mInfo("%s will cleanup", pStep->name);
437 438 439
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
440 441
  }

S
Shengliang Guan 已提交
442
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
443
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
444
  pMnode->pSteps = NULL;
445
}
S
Shengliang Guan 已提交
446

447
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
448
  int32_t size = taosArrayGetSize(pMnode->pSteps);
449
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
450
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
451
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
452

S
Shengliang Guan 已提交
453
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
454
      int32_t code = terrno;
S
Shengliang Guan 已提交
455
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
456
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
457
      terrno = code;
S
Shengliang Guan 已提交
458
      return -1;
S
Shengliang Guan 已提交
459
    } else {
460
      mInfo("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
461
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
462 463
    }
  }
S
Shengliang Guan 已提交
464

S
shm  
Shengliang Guan 已提交
465
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
466
  return 0;
467
}
S
Shengliang Guan 已提交
468

S
shm  
Shengliang Guan 已提交
469
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
470
  pMnode->msgCb = pOption->msgCb;
471
  pMnode->selfDnodeId = pOption->dnodeId;
472 473 474
  pMnode->syncMgmt.selfIndex = pOption->selfIndex;
  pMnode->syncMgmt.numOfReplicas = pOption->numOfReplicas;
  memcpy(pMnode->syncMgmt.replicas, pOption->replicas, sizeof(pOption->replicas));
L
Liu Jicong 已提交
475
}
476

477
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
478
  mInfo("start to open mnode in %s", path);
S
Shengliang Guan 已提交
479

wafwerar's avatar
wafwerar 已提交
480
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
481 482 483 484 485
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
486
  memset(pMnode, 0, sizeof(SMnode));
S
Shengliang Guan 已提交
487

S
Shengliang Guan 已提交
488 489
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
490
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
491

492
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
493 494
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
495
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
496 497 498 499
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
500

S
Shengliang Guan 已提交
501
  int32_t code = mndCreateDir(pMnode, path);
502
  if (code != 0) {
S
Shengliang Guan 已提交
503 504
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
505 506 507 508 509
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

510
  code = mndInitSteps(pMnode);
511
  if (code != 0) {
S
Shengliang Guan 已提交
512 513
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
514 515 516 517 518 519 520
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
521 522
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
523 524 525 526
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
527

S
Shengliang Guan 已提交
528
  mInfo("mnode open successfully");
S
Shengliang Guan 已提交
529 530
  return pMnode;
}
S
Shengliang Guan 已提交
531

532 533 534
void mndPreClose(SMnode *pMnode) {
  if (pMnode != NULL) {
    syncLeaderTransfer(pMnode->syncMgmt.sync);
M
Minghao Li 已提交
535
    syncPreStop(pMnode->syncMgmt.sync);
536
#if 0
537
    while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
538 539 540
      mInfo("vgId:1, snapshot is recving");
      taosMsleep(300);
    }
541
    while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
542 543 544
      mInfo("vgId:1, snapshot is sending");
      taosMsleep(300);
    }
545
#endif
546 547 548
  }
}

549
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
550
  if (pMnode != NULL) {
551
    mInfo("start to close mnode");
S
Shengliang Guan 已提交
552
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
553 554
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
555
    mInfo("mnode is closed");
S
Shengliang Guan 已提交
556
  }
557
}
S
Shengliang Guan 已提交
558

559
int32_t mndStart(SMnode *pMnode) {
560
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
561
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
562 563 564 565
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
566
    mndSetRestored(pMnode, true);
567
  }
M
Minghao Li 已提交
568

C
Cary Xu 已提交
569
  grantReset(pMnode, TSDB_GRANT_ALL, 0);
C
Cary Xu 已提交
570

M
Minghao Li 已提交
571 572 573
  return mndInitTimer(pMnode);
}

574
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
575
  mndSetStop(pMnode);
576
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
577
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
578 579
}

M
Minghao Li 已提交
580
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
581
  SMnode    *pMnode = pMsg->info.node;
582
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
583

S
Shengliang Guan 已提交
584 585
  const STraceId *trace = &pMsg->info.traceId;
  mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
586

S
Shengliang Guan 已提交
587
  int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
M
Minghao Li 已提交
588
  if (code != 0) {
S
Shengliang Guan 已提交
589
    mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
M
Minghao Li 已提交
590
  }
S
Shengliang Guan 已提交
591

592
  return code;
M
Minghao Li 已提交
593 594
}

S
Shengliang Guan 已提交
595
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
596
  if (!IsReq(pMsg)) return 0;
dengyihao's avatar
dengyihao 已提交
597 598
  if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
      pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
S
Shengliang Guan 已提交
599
      pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
D
dapan1121 已提交
600 601
    return 0;
  }
602
  if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
603

604 605 606
  SMnode    *pMnode = pMsg->info.node;
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);

L
Liu Jicong 已提交
607
  if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
608 609
      pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
      pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
610
    mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
611
           pMnode->stopped, state.restored, syncStr(state.restored));
S
Shengliang Guan 已提交
612 613
    return -1;
  }
S
Shengliang Guan 已提交
614

615 616
  const STraceId *trace = &pMsg->info.traceId;
  SEpSet          epSet = {0};
617
  mndGetMnodeEpSet(pMnode, &epSet);
618

619
  mDebug(
620 621
      "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
      "role:%s, redirect numOfEps:%d inUse:%d",
622 623
      pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
      syncStr(state.restored), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
624

S
Shengliang Guan 已提交
625 626
  if (epSet.numOfEps > 0) {
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
627
      mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
S
Shengliang Guan 已提交
628 629 630 631 632 633
    }

    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
634
      pMsg->info.hasEpSet = 1;
S
Shengliang Guan 已提交
635 636 637 638 639
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
S
Shengliang Guan 已提交
640
  } else {
S
Shengliang Guan 已提交
641
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
642
  }
S
Shengliang Guan 已提交
643 644

  return -1;
645 646
}

S
Shengliang Guan 已提交
647
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
M
Minghao Li 已提交
648
  SMnode         *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
649 650
  const STraceId *trace = &pMsg->info.traceId;

S
Shengliang Guan 已提交
651
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
652
  if (fp == NULL) {
S
Shengliang Guan 已提交
653
    mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
654 655
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
656 657
  }

S
Shengliang Guan 已提交
658 659
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
660
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
661
  int32_t code = (*fp)(pMsg);
662
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
663

S
Shengliang Guan 已提交
664
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
665
    mGTrace("msg:%p, won't response immediately since in progress", pMsg);
666
  } else if (code == 0) {
S
Shengliang Guan 已提交
667
    mGTrace("msg:%p, successfully processed", pMsg);
668
  } else {
S
Shengliang Guan 已提交
669 670
    mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
            TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
671
  }
S
Shengliang Guan 已提交
672

S
shm  
Shengliang Guan 已提交
673
  return code;
S
Shengliang Guan 已提交
674 675
}

676 677
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
S
Shengliang Guan 已提交
678
  if (type < TDMT_MAX) {
679
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
680 681 682
  }
}

D
dapan1121 已提交
683
// Note: uid 0 is reserved
684
int64_t mndGenerateUid(const char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
685
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
686
  do {
L
Liu Jicong 已提交
687
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
688 689
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
690
    if (uuid) {
L
Liu Jicong 已提交
691
      return llabs(uuid);
D
dapan1121 已提交
692 693
    }
  } while (true);
L
Liu Jicong 已提交
694
}
S
Shengliang Guan 已提交
695 696

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
S
Shengliang Guan 已提交
697
                          SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
698
  if (mndAcquireRpc(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
699

M
Minghao Li 已提交
700
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
701 702 703 704 705
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
S
Shengliang Guan 已提交
706 707 708
  pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
      pStbInfo->stbs == NULL) {
709
    mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
710 711 712 713
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
714
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
715 716
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
717 718
  pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
  pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
S
Shengliang Guan 已提交
719 720 721 722 723 724 725 726 727 728

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
729
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

748
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
749 750
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
751 752
      pClusterInfo->master_uptime = mndGetClusterUpTime(pMnode);
      // pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
753 754
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
755
      tstrncpy(desc.role, syncStr(pObj->syncState), sizeof(desc.role));
S
Shengliang Guan 已提交
756
    }
757 758
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
759 760 761 762 763 764 765 766 767 768
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;
769
    pClusterInfo->tbs_total += pVgroup->numOfTables;
S
Shengliang Guan 已提交
770 771 772

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
773 774 775 776 777

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
778 779 780 781
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
M
Minghao Li 已提交
782
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
783 784
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
785 786
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->syncState), sizeof(pVnDesc->vnode_role));
      if (pVgid->syncState == TAOS_SYNC_STATE_LEADER) {
S
Shengliang Guan 已提交
787 788 789
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
790
      if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
S
Shengliang Guan 已提交
791 792 793 794 795 796 797 798 799
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
800 801 802 803 804 805 806
  // stb info
  pIter = NULL;
  while (1) {
    SStbObj *pStb = NULL;
    pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
    if (pIter == NULL) break;

S
Shengliang Guan 已提交
807
    SMonStbDesc desc = {0};
S
Shengliang Guan 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820

    SName name1 = {0};
    tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name1, desc.database_name);

    SName name2 = {0};
    tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);

    taosArrayPush(pStbInfo->stbs, &desc);
    sdbRelease(pSdb, pStb);
  }

S
Shengliang Guan 已提交
821 822 823 824 825 826 827 828
  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

829
  mndReleaseRpc(pMnode);
S
Shengliang Guan 已提交
830
  return 0;
L
Liu Jicong 已提交
831
}
S
Shengliang Guan 已提交
832 833

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
834 835 836
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
  pLoad->syncState = state.state;
  pLoad->syncRestore = state.restored;
837
  mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
S
Shengliang Guan 已提交
838
  return 0;
L
fix  
Liu Jicong 已提交
839
}
S
Shengliang Guan 已提交
840

841
void mndSetRestored(SMnode *pMnode, bool restored) {
S
Shengliang Guan 已提交
842 843 844 845
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
846
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
847 848 849 850
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
851
    mInfo("mnode set restored:%d", restored);
S
Shengliang Guan 已提交
852 853 854 855 856 857 858 859 860 861 862 863 864
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
S
Shengliang Guan 已提交
865
  mInfo("mnode set stopped");
S
Shengliang Guan 已提交
866 867 868
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }