mnode.c 17.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
S
Shengliang Guan 已提交
30
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
31
#include "mndSma.h"
S
Shengliang Guan 已提交
32
#include "mndSnode.h"
S
Shengliang Guan 已提交
33
#include "mndStb.h"
L
Liu Jicong 已提交
34
#include "mndStream.h"
L
Liu Jicong 已提交
35
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
36 37
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
38
#include "mndTopic.h"
S
Shengliang Guan 已提交
39 40 41
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
42

S
Shengliang Guan 已提交
43 44 45 46
#define MQ_TIMER_MS    3000
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000

S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
60
static void mndPullupTrans(void *param, void *tmrId) {
S
Shengliang Guan 已提交
61 62
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
63 64
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
65
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
66
    tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
67 68
  }

S
Shengliang Guan 已提交
69
  taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
S
Shengliang Guan 已提交
70 71
}

L
Liu Jicong 已提交
72 73
static void mndCalMqRebalance(void *param, void *tmrId) {
  SMnode *pMnode = param;
L
Liu Jicong 已提交
74
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
75 76
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
L
Liu Jicong 已提交
77 78 79 80 81
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
S
Shengliang Guan 已提交
82
    tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
L
Liu Jicong 已提交
83 84
  }

S
Shengliang Guan 已提交
85 86 87
  taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}

S
Shengliang Guan 已提交
88
static void mndPullupTelem(void *param, void *tmrId) {
S
Shengliang Guan 已提交
89 90 91 92 93
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
94
    tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
95 96
  }

S
Shengliang Guan 已提交
97
  taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
L
Liu Jicong 已提交
98 99
}

100
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
101
  pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
102
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
103 104
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
105 106
  }

S
shm  
Shengliang Guan 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120
  if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
L
Liu Jicong 已提交
121

S
Shengliang Guan 已提交
122 123 124
  return 0;
}

125 126
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
S
Shengliang Guan 已提交
127 128
    taosTmrStop(pMnode->transTimer);
    pMnode->transTimer = NULL;
L
Liu Jicong 已提交
129 130
    taosTmrStop(pMnode->mqTimer);
    pMnode->mqTimer = NULL;
S
Shengliang Guan 已提交
131 132
    taosTmrStop(pMnode->telemTimer);
    pMnode->telemTimer = NULL;
133 134
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
135 136 137
  }
}

S
Shengliang Guan 已提交
138
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
139 140 141
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
142
    return -1;
143 144 145 146
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
147
    return -1;
148
  }
149 150

  return 0;
151
}
S
Shengliang Guan 已提交
152

153 154 155
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
156
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
157

S
Shengliang Guan 已提交
158
  pMnode->pSdb = sdbInit(&opt);
159
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
160 161 162 163 164 165
    return -1;
  }

  return 0;
}

166
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
167
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
168 169 170

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
171
    sdbCleanup(pMnode->pSdb);
172 173 174 175
    pMnode->pSdb = NULL;
  }
}

176 177 178 179 180
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
181
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
182 183 184 185
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
186 187 188
  return 0;
}

189
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
190 191 192 193
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
194 195 196
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
197
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
198
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
S
Shengliang Guan 已提交
199 200
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
201
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
202
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
203 204
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
205
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
206
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
207
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
sma  
Shengliang Guan 已提交
208
  if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
209
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
S
Shengliang Guan 已提交
210
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
211
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
212
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
213 214 215 216
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
S
shm  
Shengliang Guan 已提交
217
  // if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
S
Shengliang Guan 已提交
218 219 220 221 222
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
223 224 225 226

  return 0;
}

227
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
228 229
  if (pMnode->pSteps == NULL) return;

230
  if (pos == -1) {
231
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
232 233
  }

234
  for (int32_t s = pos; s >= 0; s--) {
235
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
236
    mDebug("%s will cleanup", pStep->name);
237 238 239
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
240 241
  }

S
Shengliang Guan 已提交
242
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
243
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
244
  pMnode->pSteps = NULL;
245
}
S
Shengliang Guan 已提交
246

247
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
248
  int32_t size = taosArrayGetSize(pMnode->pSteps);
249
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
250
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
251
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
252

S
Shengliang Guan 已提交
253
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
254
      int32_t code = terrno;
S
Shengliang Guan 已提交
255
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
256
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
257
      terrno = code;
S
Shengliang Guan 已提交
258
      return -1;
S
Shengliang Guan 已提交
259
    } else {
S
Shengliang Guan 已提交
260
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
261 262
    }
  }
S
Shengliang Guan 已提交
263 264

  return 0;
265
}
S
Shengliang Guan 已提交
266

267 268 269 270 271 272
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
273 274 275
  pMnode->msgCb = pOption->msgCb;

  if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
S
Shengliang Guan 已提交
276
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
277 278 279
    return -1;
  }

280
  return 0;
L
Liu Jicong 已提交
281
}
282

283
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
284 285
  mDebug("start to open mnode in %s", path);

286
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
287 288 289 290 291 292
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
293 294 295
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
296 297 298 299 300 301 302
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
303

S
Shengliang Guan 已提交
304
  int32_t code = mndCreateDir(pMnode, path);
305
  if (code != 0) {
S
Shengliang Guan 已提交
306 307
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
308 309 310 311 312 313
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
314
  if (code != 0) {
S
Shengliang Guan 已提交
315 316
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
317 318 319 320 321 322 323
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
324 325
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
326 327 328 329 330 331 332
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
333 334
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
335 336 337 338
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
339

S
Shengliang Guan 已提交
340
  mndUpdateMnodeRole(pMnode);
S
Shengliang Guan 已提交
341
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
342 343
  return pMnode;
}
S
Shengliang Guan 已提交
344

345
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
346 347 348 349 350 351 352
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
    tfree(pMnode);
    mDebug("mnode is closed");
  }
353
}
S
Shengliang Guan 已提交
354

355
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
356 357
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
358 359
  return 0;
}
S
Shengliang Guan 已提交
360

S
shm  
Shengliang Guan 已提交
361 362
int32_t mndStart(SMnode *pMnode) {
  return mndInitTimer(pMnode);
363
}
S
Shengliang Guan 已提交
364

S
Shengliang Guan 已提交
365 366
int32_t mndProcessMsg(SNodeMsg *pMsg) {
  SMnode  *pMnode = pMsg->pNode;
S
shm  
Shengliang Guan 已提交
367 368 369 370
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  tmsg_t   msgType = pMsg->rpcMsg.msgType;
  void    *ahandle = pMsg->rpcMsg.ahandle;
  bool     isReq = (pRpc->msgType & 1U);
371

S
shm  
Shengliang Guan 已提交
372
  mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(msgType), ahandle);
S
Shengliang Guan 已提交
373

S
Shengliang Guan 已提交
374
  if (isReq && !mndIsMaster(pMnode)) {
S
shm  
Shengliang Guan 已提交
375
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
376
    mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
377
    return -1;
S
Shengliang Guan 已提交
378 379
  }

S
shm  
Shengliang Guan 已提交
380 381
  if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) {
    terrno = TSDB_CODE_MND_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
382
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
383
    return -1;
S
Shengliang Guan 已提交
384 385
  }

386
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
387
  if (fp == NULL) {
S
shm  
Shengliang Guan 已提交
388
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
389
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
shm  
Shengliang Guan 已提交
390
    return -1;
S
Shengliang Guan 已提交
391 392
  }

S
shm  
Shengliang Guan 已提交
393
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
394
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
shm  
Shengliang Guan 已提交
395
    terrno = code;
S
Shengliang Guan 已提交
396
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
397
  } else if (code != 0) {
S
Shengliang Guan 已提交
398
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
399
  } else {
S
Shengliang Guan 已提交
400
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
401 402
  }

S
shm  
Shengliang Guan 已提交
403
  return code;
S
Shengliang Guan 已提交
404 405
}

406 407 408 409
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
410 411 412
  }
}

D
dapan1121 已提交
413
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
414
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
415
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
416 417 418

  do {
    int64_t  us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
419 420
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
421
    if (uuid) {
S
sma  
Shengliang Guan 已提交
422
      return abs(uuid);
D
dapan1121 已提交
423 424
    }
  } while (true);
L
Liu Jicong 已提交
425
}
S
Shengliang Guan 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454

void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
  memset(pLoad, 0, sizeof(SMnodeLoad));

  SSdb *pSdb = pMnode->pSdb;
  pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE);
  pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE);
  pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP);
  pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB);
  pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB);

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pLoad->numOfChildTable += pVgroup->numOfTables;
    pLoad->numOfColumn += pVgroup->numOfTimeSeries;
    pLoad->totalPoints += pVgroup->pointsWritten;
    pLoad->totalStorage += pVgroup->totalStorage;
    pLoad->compStorage += pVgroup->compStorage;

    sdbRelease(pSdb, pVgroup);
  }
}

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
455
  if (!mndIsMaster(pMnode)) return -1;
S
Shengliang Guan 已提交
456

S
Shengliang Guan 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
    tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role));
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);

    if (pObj->role == TAOS_SYNC_STATE_LEADER) {
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
      pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
    }
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
    strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
      tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role));
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
      if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) {
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
L
Liu Jicong 已提交
553
}