mnode.c 18.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
S
Shengliang Guan 已提交
30
#include "mndShow.h"
S
Shengliang Guan 已提交
31
#include "mndSnode.h"
S
Shengliang Guan 已提交
32
#include "mndStb.h"
L
Liu Jicong 已提交
33
#include "mndStream.h"
L
Liu Jicong 已提交
34
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
35 36
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
37
#include "mndTopic.h"
S
Shengliang Guan 已提交
38 39 40
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
41

S
Shengliang Guan 已提交
42 43 44 45
#define MQ_TIMER_MS    3000
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000

S
Shengliang Guan 已提交
46 47 48 49
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
50
  }
S
Shengliang Guan 已提交
51 52

  return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
S
Shengliang Guan 已提交
53
}
S
Shengliang Guan 已提交
54

S
Shengliang Guan 已提交
55 56 57 58
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
59
  }
S
Shengliang Guan 已提交
60 61

  return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
S
Shengliang Guan 已提交
62
}
S
Shengliang Guan 已提交
63

S
Shengliang Guan 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
77
static void mndPullupTrans(void *param, void *tmrId) {
S
Shengliang Guan 已提交
78 79
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
80 81
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
82
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
83
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
S
Shengliang Guan 已提交
84 85
  }

S
Shengliang Guan 已提交
86
  taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
S
Shengliang Guan 已提交
87 88
}

L
Liu Jicong 已提交
89 90
static void mndCalMqRebalance(void *param, void *tmrId) {
  SMnode *pMnode = param;
L
Liu Jicong 已提交
91
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
92 93
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
L
Liu Jicong 已提交
94 95 96 97 98
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
L
Liu Jicong 已提交
99
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
L
Liu Jicong 已提交
100 101
  }

S
Shengliang Guan 已提交
102 103 104
  taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}

S
Shengliang Guan 已提交
105
static void mndPullupTelem(void *param, void *tmrId) {
S
Shengliang Guan 已提交
106 107 108 109 110 111 112 113
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
  }

S
Shengliang Guan 已提交
114
  taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
L
Liu Jicong 已提交
115 116
}

117
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
118
  pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
119
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
120 121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
122 123
  }

S
shm  
Shengliang Guan 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137
  // if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }

  // if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }

  // if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }
L
Liu Jicong 已提交
138

S
Shengliang Guan 已提交
139 140 141
  return 0;
}

142 143
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
S
Shengliang Guan 已提交
144 145
    taosTmrStop(pMnode->transTimer);
    pMnode->transTimer = NULL;
L
Liu Jicong 已提交
146 147
    taosTmrStop(pMnode->mqTimer);
    pMnode->mqTimer = NULL;
S
Shengliang Guan 已提交
148 149
    taosTmrStop(pMnode->telemTimer);
    pMnode->telemTimer = NULL;
150 151
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
152 153 154
  }
}

S
Shengliang Guan 已提交
155
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
156 157 158
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
159
    return -1;
160 161 162 163
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
164
    return -1;
165
  }
166 167

  return 0;
168
}
S
Shengliang Guan 已提交
169

170 171 172
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
173
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
174

S
Shengliang Guan 已提交
175
  pMnode->pSdb = sdbInit(&opt);
176
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
177 178 179 180 181 182
    return -1;
  }

  return 0;
}

183
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
184
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
185 186 187

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
188
    sdbCleanup(pMnode->pSdb);
189 190 191 192
    pMnode->pSdb = NULL;
  }
}

193 194 195 196 197
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
198
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
199 200 201 202
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
203 204 205
  return 0;
}

206
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
207 208 209 210
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
211 212 213
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
214
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
215
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
S
Shengliang Guan 已提交
216 217
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
218
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
219
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
220 221
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
222
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
223
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
224
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
D
dapan1121 已提交
225
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
S
Shengliang Guan 已提交
226
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
227
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
228
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
229 230 231 232 233 234 235 236 237 238
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
239 240 241 242

  return 0;
}

243
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
244 245
  if (pMnode->pSteps == NULL) return;

246
  if (pos == -1) {
247
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
248 249
  }

250
  for (int32_t s = pos; s >= 0; s--) {
251
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
252
    mDebug("%s will cleanup", pStep->name);
253 254 255
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
256 257
  }

S
Shengliang Guan 已提交
258
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
259
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
260
  pMnode->pSteps = NULL;
261
}
S
Shengliang Guan 已提交
262

263
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
264
  int32_t size = taosArrayGetSize(pMnode->pSteps);
265
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
266
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
267
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
268

S
Shengliang Guan 已提交
269
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
270
      int32_t code = terrno;
S
Shengliang Guan 已提交
271
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
272
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
273
      terrno = code;
S
Shengliang Guan 已提交
274
      return -1;
S
Shengliang Guan 已提交
275
    } else {
S
Shengliang Guan 已提交
276
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
277 278
    }
  }
S
Shengliang Guan 已提交
279 280

  return 0;
281
}
S
Shengliang Guan 已提交
282

283 284 285 286 287 288 289
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  pMnode->pDnode = pOption->pDnode;
S
Shengliang Guan 已提交
290
  pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
L
Liu Jicong 已提交
291
  pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
S
Shengliang Guan 已提交
292 293 294
  pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
  pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
  pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
295

S
Shengliang Guan 已提交
296
  if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
S
Shengliang Guan 已提交
297
      pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
S
Shengliang Guan 已提交
298
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
299 300 301
    return -1;
  }

302
  return 0;
L
Liu Jicong 已提交
303
}
304

305
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
306 307
  mDebug("start to open mnode in %s", path);

308
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
309 310 311 312 313 314
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
315 316 317
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
318 319 320 321 322 323 324
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
325

S
Shengliang Guan 已提交
326
  int32_t code = mndCreateDir(pMnode, path);
327
  if (code != 0) {
S
Shengliang Guan 已提交
328 329
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
330 331 332 333 334 335
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
336
  if (code != 0) {
S
Shengliang Guan 已提交
337 338
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
339 340 341 342 343 344 345
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
346 347
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
348 349 350 351 352 353 354
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
355 356
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
357 358 359 360
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
361

S
Shengliang Guan 已提交
362
  mndUpdateMnodeRole(pMnode);
S
Shengliang Guan 已提交
363
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
364 365
  return pMnode;
}
S
Shengliang Guan 已提交
366

367
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
368 369 370 371 372 373 374
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
    tfree(pMnode);
    mDebug("mnode is closed");
  }
375
}
S
Shengliang Guan 已提交
376

377
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
378 379
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
380 381
  return 0;
}
S
Shengliang Guan 已提交
382

383
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
384
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
385
  taosRemoveDir(path);
S
Shengliang Guan 已提交
386
  mDebug("mnode is destroyed");
387
}
S
Shengliang Guan 已提交
388

S
shm  
Shengliang Guan 已提交
389 390 391 392 393 394
int32_t mndProcessMsg(SMndMsg *pMsg) {
  SMnode  *pMnode = pMsg->pMnode;
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  tmsg_t   msgType = pMsg->rpcMsg.msgType;
  void    *ahandle = pMsg->rpcMsg.ahandle;
  bool     isReq = (pRpc->msgType & 1U);
395

S
Shengliang Guan 已提交
396
  mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
S
Shengliang Guan 已提交
397

S
Shengliang Guan 已提交
398
  if (isReq && !mndIsMaster(pMnode)) {
S
shm  
Shengliang Guan 已提交
399
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
400
    mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
401
    return -1;
S
Shengliang Guan 已提交
402 403
  }

S
shm  
Shengliang Guan 已提交
404 405
  if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) {
    terrno = TSDB_CODE_MND_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
406
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
407
    return -1;
S
Shengliang Guan 已提交
408 409
  }

410
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
411
  if (fp == NULL) {
S
shm  
Shengliang Guan 已提交
412
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
413
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
shm  
Shengliang Guan 已提交
414
    return -1;
S
Shengliang Guan 已提交
415 416
  }

S
shm  
Shengliang Guan 已提交
417
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
418
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
shm  
Shengliang Guan 已提交
419
    terrno = code;
S
Shengliang Guan 已提交
420
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
421
  } else if (code != 0) {
S
Shengliang Guan 已提交
422
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
423
  } else {
S
Shengliang Guan 已提交
424
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
425 426
  }

S
shm  
Shengliang Guan 已提交
427
  return code;
S
Shengliang Guan 已提交
428 429
}

430 431 432 433
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
434 435 436
  }
}

D
dapan1121 已提交
437
// Note: uid 0 is reserved
S
Shengliang Guan 已提交
438
uint64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
439
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
440 441 442 443 444 445 446 447 448

  do {
    int64_t  us = taosGetTimestampUs();
    uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    uint64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
    if (uuid) {
      return uuid;
    }
  } while (true);
L
Liu Jicong 已提交
449
}
S
Shengliang Guan 已提交
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478

void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
  memset(pLoad, 0, sizeof(SMnodeLoad));

  SSdb *pSdb = pMnode->pSdb;
  pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE);
  pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE);
  pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP);
  pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB);
  pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB);

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pLoad->numOfChildTable += pVgroup->numOfTables;
    pLoad->numOfColumn += pVgroup->numOfTimeSeries;
    pLoad->totalPoints += pVgroup->pointsWritten;
    pLoad->totalStorage += pVgroup->totalStorage;
    pLoad->compStorage += pVgroup->compStorage;

    sdbRelease(pSdb, pVgroup);
  }
}

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
479
  if (!mndIsMaster(pMnode)) return -1;
S
Shengliang Guan 已提交
480

S
Shengliang Guan 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
    tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role));
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);

    if (pObj->role == TAOS_SYNC_STATE_LEADER) {
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
      pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
    }
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
    strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
      tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role));
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
      if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) {
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
L
Liu Jicong 已提交
577
}