mnode.c 20.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
S
Shengliang Guan 已提交
30
#include "mndShow.h"
S
Shengliang Guan 已提交
31
#include "mndSnode.h"
S
Shengliang Guan 已提交
32
#include "mndStb.h"
L
Liu Jicong 已提交
33
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
34 35
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
36
#include "mndTopic.h"
S
Shengliang Guan 已提交
37 38 39
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
40

S
Shengliang Guan 已提交
41 42 43 44
#define MQ_TIMER_MS    3000
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000

S
Shengliang Guan 已提交
45 46 47 48
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
49
  }
S
Shengliang Guan 已提交
50 51

  return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
S
Shengliang Guan 已提交
52
}
S
Shengliang Guan 已提交
53

S
Shengliang Guan 已提交
54 55 56 57
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
58
  }
S
Shengliang Guan 已提交
59 60

  return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
S
Shengliang Guan 已提交
61
}
S
Shengliang Guan 已提交
62

S
Shengliang Guan 已提交
63 64 65
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) {
    (*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg);
66
  }
S
Shengliang Guan 已提交
67
}
S
Shengliang Guan 已提交
68

S
Shengliang Guan 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
82
static void mndPullupTrans(void *param, void *tmrId) {
S
Shengliang Guan 已提交
83 84
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
85 86
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
87
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
88
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
S
Shengliang Guan 已提交
89 90
  }

S
Shengliang Guan 已提交
91
  taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
S
Shengliang Guan 已提交
92 93
}

L
Liu Jicong 已提交
94 95
static void mndCalMqRebalance(void *param, void *tmrId) {
  SMnode *pMnode = param;
L
Liu Jicong 已提交
96
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
97 98 99
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
L
Liu Jicong 已提交
100
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
L
Liu Jicong 已提交
101 102
  }

S
Shengliang Guan 已提交
103 104 105
  taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}

S
Shengliang Guan 已提交
106
static void mndPullupTelem(void *param, void *tmrId) {
S
Shengliang Guan 已提交
107 108 109 110 111 112 113 114
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
  }

S
Shengliang Guan 已提交
115
  taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
L
Liu Jicong 已提交
116 117
}

118
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
119
  pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
120
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
121 122
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
123 124
  }

S
Shengliang Guan 已提交
125
  if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
S
Shengliang Guan 已提交
126
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
127 128 129
    return -1;
  }

S
Shengliang Guan 已提交
130
  if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
S
Shengliang Guan 已提交
131 132 133 134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
135
  if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
L
Liu Jicong 已提交
136 137 138 139
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
140 141 142
  return 0;
}

143 144
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
S
Shengliang Guan 已提交
145 146
    taosTmrStop(pMnode->transTimer);
    pMnode->transTimer = NULL;
L
Liu Jicong 已提交
147 148
    taosTmrStop(pMnode->mqTimer);
    pMnode->mqTimer = NULL;
S
Shengliang Guan 已提交
149 150
    taosTmrStop(pMnode->telemTimer);
    pMnode->telemTimer = NULL;
151 152
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
153 154 155
  }
}

S
Shengliang Guan 已提交
156
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
157 158 159
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
160
    return -1;
161 162 163 164
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
165
    return -1;
166
  }
167 168

  return 0;
169
}
S
Shengliang Guan 已提交
170

171 172 173
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
174
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
175

S
Shengliang Guan 已提交
176
  pMnode->pSdb = sdbInit(&opt);
177
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
178 179 180 181 182 183
    return -1;
  }

  return 0;
}

184
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
185
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
186 187 188

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
189
    sdbCleanup(pMnode->pSdb);
190 191 192 193
    pMnode->pSdb = NULL;
  }
}

194 195 196 197 198
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
199
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
200 201 202 203
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
204 205 206
  return 0;
}

207
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
208 209 210 211
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
212 213 214
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
215
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
216
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
S
Shengliang Guan 已提交
217 218 219
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
220 221
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
222
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
223
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
224
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
D
dapan1121 已提交
225
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
S
Shengliang Guan 已提交
226
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
227
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
228
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
229 230 231 232 233 234 235 236 237 238
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
239 240 241 242

  return 0;
}

243
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
244 245
  if (pMnode->pSteps == NULL) return;

246
  if (pos == -1) {
247
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
248 249
  }

250
  for (int32_t s = pos; s >= 0; s--) {
251
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
252
    mDebug("%s will cleanup", pStep->name);
253 254 255
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
256 257
  }

S
Shengliang Guan 已提交
258
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
259
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
260
  pMnode->pSteps = NULL;
261
}
S
Shengliang Guan 已提交
262

263
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
264
  int32_t size = taosArrayGetSize(pMnode->pSteps);
265
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
266
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
267
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
268

S
Shengliang Guan 已提交
269
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
270
      int32_t code = terrno;
S
Shengliang Guan 已提交
271
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
272
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
273
      terrno = code;
S
Shengliang Guan 已提交
274
      return -1;
S
Shengliang Guan 已提交
275
    } else {
S
Shengliang Guan 已提交
276
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
277 278
    }
  }
S
Shengliang Guan 已提交
279 280

  return 0;
281
}
S
Shengliang Guan 已提交
282

283 284 285 286 287 288 289
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  pMnode->pDnode = pOption->pDnode;
S
Shengliang Guan 已提交
290
  pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
L
Liu Jicong 已提交
291
  pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
S
Shengliang Guan 已提交
292 293 294
  pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
  pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
  pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
295

S
Shengliang Guan 已提交
296
  if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
S
Shengliang Guan 已提交
297
      pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
S
Shengliang Guan 已提交
298
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
299 300 301
    return -1;
  }

302
  return 0;
L
Liu Jicong 已提交
303
}
304

305
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
306 307
  mDebug("start to open mnode in %s", path);

308
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
309 310 311 312 313 314
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
315 316 317
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
318 319 320 321 322 323 324
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
325

S
Shengliang Guan 已提交
326
  int32_t code = mndCreateDir(pMnode, path);
327
  if (code != 0) {
S
Shengliang Guan 已提交
328 329
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
330 331 332 333 334 335
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
336
  if (code != 0) {
S
Shengliang Guan 已提交
337 338
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
339 340 341 342 343 344 345
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
346 347
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
348 349 350 351 352 353 354
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
355 356
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
357 358 359 360
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
361

S
Shengliang Guan 已提交
362
  mndUpdateMnodeRole(pMnode);
S
Shengliang Guan 已提交
363
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
364 365
  return pMnode;
}
S
Shengliang Guan 已提交
366

367
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
368 369 370 371 372 373 374
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
    tfree(pMnode);
    mDebug("mnode is closed");
  }
375
}
S
Shengliang Guan 已提交
376

377
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
378 379
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
380 381
  return 0;
}
S
Shengliang Guan 已提交
382

383
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
384
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
385
  taosRemoveDir(path);
S
Shengliang Guan 已提交
386
  mDebug("mnode is destroyed");
387
}
S
Shengliang Guan 已提交
388

389
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
390 391 392
  SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
393
    mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
S
Shengliang Guan 已提交
394 395 396
    return NULL;
  }

S
Shengliang Guan 已提交
397 398
  if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER &&
      pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) {
399 400 401 402 403 404 405 406
    SRpcConnInfo connInfo = {0};
    if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
      taosFreeQitem(pMsg);
      terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
      mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
      return NULL;
    }
    memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
407 408
  }

S
Shengliang Guan 已提交
409
  pMsg->pMnode = pMnode;
S
Shengliang Guan 已提交
410 411 412
  pMsg->rpcMsg = *pRpcMsg;
  pMsg->createdTime = taosGetTimestampSec();

S
Shengliang Guan 已提交
413 414 415
  if (pRpcMsg != NULL) {
    mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user);
  }
S
Shengliang Guan 已提交
416 417 418
  return pMsg;
}

419
void mndCleanupMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
420
  mTrace("msg:%p, is destroyed", pMsg);
S
Shengliang Guan 已提交
421 422
  rpcFreeCont(pMsg->rpcMsg.pCont);
  pMsg->rpcMsg.pCont = NULL;
S
Shengliang Guan 已提交
423 424 425
  taosFreeQitem(pMsg);
}

426 427 428 429
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
  SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
  rpcSendResponse(&rpcRsp);
}
S
Shengliang Guan 已提交
430

S
Shengliang Guan 已提交
431
void mndProcessMsg(SMnodeMsg *pMsg) {
432
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
433
  int32_t code = 0;
434
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
S
Shengliang Guan 已提交
435
  void   *ahandle = pMsg->rpcMsg.ahandle;
S
Shengliang Guan 已提交
436
  bool    isReq = (msgType & 1U);
437

S
Shengliang Guan 已提交
438
  mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
S
Shengliang Guan 已提交
439

S
Shengliang Guan 已提交
440 441
  if (isReq && !mndIsMaster(pMnode)) {
    code = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
442
    mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
443 444 445 446 447
    goto PROCESS_RPC_END;
  }

  if (isReq && pMsg->rpcMsg.pCont == NULL) {
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
448
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
449
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
450 451
  }

452
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
453
  if (fp == NULL) {
S
Shengliang Guan 已提交
454
    code = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
455
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
456
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
457 458
  }

459
  code = (*fp)(pMsg);
S
Shengliang Guan 已提交
460
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
461
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
462 463
    return;
  } else if (code != 0) {
S
Shengliang Guan 已提交
464
    code = terrno;
S
Shengliang Guan 已提交
465
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
466
    goto PROCESS_RPC_END;
467
  } else {
S
Shengliang Guan 已提交
468
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
469 470 471 472
  }

PROCESS_RPC_END:
  if (isReq) {
S
Shengliang Guan 已提交
473 474
    if (pMsg->rpcMsg.handle == NULL) return;

S
Shengliang Guan 已提交
475
    if (code == TSDB_CODE_APP_NOT_READY) {
S
Shengliang Guan 已提交
476
      mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
S
Shengliang Guan 已提交
477
    } else if (code != 0) {
D
dapan1121 已提交
478
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code};
S
Shengliang Guan 已提交
479
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
480
    } else {
S
Shengliang Guan 已提交
481 482
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
483
    }
S
Shengliang Guan 已提交
484
  }
S
Shengliang Guan 已提交
485 486
}

487 488 489 490
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
491 492 493
  }
}

D
dapan1121 已提交
494
// Note: uid 0 is reserved
S
Shengliang Guan 已提交
495
uint64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
496
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
497 498 499 500 501 502 503 504 505

  do {
    int64_t  us = taosGetTimestampUs();
    uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    uint64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
    if (uuid) {
      return uuid;
    }
  } while (true);
L
Liu Jicong 已提交
506
}
S
Shengliang Guan 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535

void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
  memset(pLoad, 0, sizeof(SMnodeLoad));

  SSdb *pSdb = pMnode->pSdb;
  pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE);
  pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE);
  pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP);
  pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB);
  pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB);

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pLoad->numOfChildTable += pVgroup->numOfTables;
    pLoad->numOfColumn += pVgroup->numOfTimeSeries;
    pLoad->totalPoints += pVgroup->pointsWritten;
    pLoad->totalStorage += pVgroup->totalStorage;
    pLoad->compStorage += pVgroup->compStorage;

    sdbRelease(pSdb, pVgroup);
  }
}

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
536 537
  if (!mndIsMaster(pMnode)) return;

S
Shengliang Guan 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
    tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role));
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);

    if (pObj->role == TAOS_SYNC_STATE_LEADER) {
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
      pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
    }
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
    strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
      tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role));
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
      if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) {
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
}