mnode.c 18.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
S
monitor  
Shengliang Guan 已提交
25
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
26
#include "mndMnode.h"
L
Liu Jicong 已提交
27
#include "mndOffset.h"
S
Shengliang Guan 已提交
28
#include "mndProfile.h"
S
Shengliang Guan 已提交
29
#include "mndQnode.h"
S
Shengliang Guan 已提交
30
#include "mndShow.h"
S
Shengliang Guan 已提交
31
#include "mndSnode.h"
S
Shengliang Guan 已提交
32
#include "mndStb.h"
L
Liu Jicong 已提交
33
#include "mndStream.h"
L
Liu Jicong 已提交
34
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
35 36
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
37
#include "mndTopic.h"
S
Shengliang Guan 已提交
38 39 40
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
41

S
Shengliang Guan 已提交
42 43 44 45
#define MQ_TIMER_MS    3000
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000

S
Shengliang Guan 已提交
46
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
47
  if (pMnode == NULL || pMnode->sendReqFp == NULL) {
S
Shengliang Guan 已提交
48 49
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
50
  }
S
Shengliang Guan 已提交
51

S
shm  
Shengliang Guan 已提交
52
  return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg);
S
Shengliang Guan 已提交
53
}
S
Shengliang Guan 已提交
54

S
Shengliang Guan 已提交
55
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
56
  if (pMnode == NULL || pMnode->sendReqFp == NULL) {
S
Shengliang Guan 已提交
57 58
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
59
  }
S
Shengliang Guan 已提交
60

S
shm  
Shengliang Guan 已提交
61
  return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg);
S
Shengliang Guan 已提交
62
}
S
Shengliang Guan 已提交
63

S
Shengliang Guan 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
77
static void mndPullupTrans(void *param, void *tmrId) {
S
Shengliang Guan 已提交
78 79
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
80 81
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
82
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
S
shm  
Shengliang Guan 已提交
83
    pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg);
S
Shengliang Guan 已提交
84 85
  }

S
Shengliang Guan 已提交
86
  taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
S
Shengliang Guan 已提交
87 88
}

L
Liu Jicong 已提交
89 90
static void mndCalMqRebalance(void *param, void *tmrId) {
  SMnode *pMnode = param;
L
Liu Jicong 已提交
91
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
92 93
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
L
Liu Jicong 已提交
94 95 96 97 98
    SRpcMsg rpcMsg = {
        .msgType = TDMT_MND_MQ_TIMER,
        .pCont = pReq,
        .contLen = contLen,
    };
S
shm  
Shengliang Guan 已提交
99
    pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
L
Liu Jicong 已提交
100 101
  }

S
Shengliang Guan 已提交
102 103 104
  taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}

S
Shengliang Guan 已提交
105
static void mndPullupTelem(void *param, void *tmrId) {
S
Shengliang Guan 已提交
106 107 108 109 110
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
S
shm  
Shengliang Guan 已提交
111
    pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
S
Shengliang Guan 已提交
112 113
  }

S
Shengliang Guan 已提交
114
  taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
L
Liu Jicong 已提交
115 116
}

117
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
118
  pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
119
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
120 121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
122 123
  }

S
shm  
Shengliang Guan 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137
  // if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }

  // if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }

  // if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
  //   terrno = TSDB_CODE_OUT_OF_MEMORY;
  //   return -1;
  // }
L
Liu Jicong 已提交
138

S
Shengliang Guan 已提交
139 140 141
  return 0;
}

142 143
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
S
Shengliang Guan 已提交
144 145
    taosTmrStop(pMnode->transTimer);
    pMnode->transTimer = NULL;
L
Liu Jicong 已提交
146 147
    taosTmrStop(pMnode->mqTimer);
    pMnode->mqTimer = NULL;
S
Shengliang Guan 已提交
148 149
    taosTmrStop(pMnode->telemTimer);
    pMnode->telemTimer = NULL;
150 151
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
152 153 154
  }
}

S
Shengliang Guan 已提交
155
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
156 157 158
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
159
    return -1;
160 161 162 163
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
164
    return -1;
165
  }
166 167

  return 0;
168
}
S
Shengliang Guan 已提交
169

170 171 172
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
173
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
174

S
Shengliang Guan 已提交
175
  pMnode->pSdb = sdbInit(&opt);
176
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
177 178 179 180 181 182
    return -1;
  }

  return 0;
}

183
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
184
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
185 186 187

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
188
    sdbCleanup(pMnode->pSdb);
189 190 191 192
    pMnode->pSdb = NULL;
  }
}

193 194 195 196 197
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
198
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
199 200 201 202
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
203 204 205
  return 0;
}

206
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
207 208 209 210
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
211 212 213
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
214
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
215
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
S
Shengliang Guan 已提交
216 217
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
218
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
219
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
220 221
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
222
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
223
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
224
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
D
dapan1121 已提交
225
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
S
Shengliang Guan 已提交
226
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
227
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
228
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
229 230 231 232 233 234 235 236 237 238
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
239 240 241 242

  return 0;
}

243
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
244 245
  if (pMnode->pSteps == NULL) return;

246
  if (pos == -1) {
247
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
248 249
  }

250
  for (int32_t s = pos; s >= 0; s--) {
251
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
252
    mDebug("%s will cleanup", pStep->name);
253 254 255
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
256 257
  }

S
Shengliang Guan 已提交
258
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
259
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
260
  pMnode->pSteps = NULL;
261
}
S
Shengliang Guan 已提交
262

263
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
264
  int32_t size = taosArrayGetSize(pMnode->pSteps);
265
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
266
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
267
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
268

S
Shengliang Guan 已提交
269
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
270
      int32_t code = terrno;
S
Shengliang Guan 已提交
271
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
272
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
273
      terrno = code;
S
Shengliang Guan 已提交
274
      return -1;
S
Shengliang Guan 已提交
275
    } else {
S
Shengliang Guan 已提交
276
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
277 278
    }
  }
S
Shengliang Guan 已提交
279 280

  return 0;
281
}
S
Shengliang Guan 已提交
282

283 284 285 286 287 288
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
shm  
Shengliang Guan 已提交
289 290 291
  pMnode->pWrapper = pOption->pWrapper;
  pMnode->putToWriteQFp = pOption->putToWriteQFp;
  pMnode->putToReadQFp = pOption->putToReadQFp;
S
shm  
Shengliang Guan 已提交
292
  pMnode->sendReqFp = pOption->sendReqFp;
S
shm  
Shengliang Guan 已提交
293
  pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp;
294

S
shm  
Shengliang Guan 已提交
295 296
  if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL  ||
      pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
S
Shengliang Guan 已提交
297
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
298 299 300
    return -1;
  }

301
  return 0;
L
Liu Jicong 已提交
302
}
303

304
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
305 306
  mDebug("start to open mnode in %s", path);

307
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
308 309 310 311 312 313
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
314 315 316
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
317 318 319 320 321 322 323
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
324

S
Shengliang Guan 已提交
325
  int32_t code = mndCreateDir(pMnode, path);
326
  if (code != 0) {
S
Shengliang Guan 已提交
327 328
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
329 330 331 332 333 334
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
335
  if (code != 0) {
S
Shengliang Guan 已提交
336 337
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
338 339 340 341 342 343 344
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
345 346
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
347 348 349 350 351 352 353
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
354 355
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
356 357 358 359
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
360

S
Shengliang Guan 已提交
361
  mndUpdateMnodeRole(pMnode);
S
Shengliang Guan 已提交
362
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
363 364
  return pMnode;
}
S
Shengliang Guan 已提交
365

366
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
367 368 369 370 371 372 373
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
    tfree(pMnode);
    mDebug("mnode is closed");
  }
374
}
S
Shengliang Guan 已提交
375

376
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
377 378
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
379 380
  return 0;
}
S
Shengliang Guan 已提交
381

382
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
383
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
384
  taosRemoveDir(path);
S
Shengliang Guan 已提交
385
  mDebug("mnode is destroyed");
386
}
S
Shengliang Guan 已提交
387

S
Shengliang Guan 已提交
388 389
int32_t mndProcessMsg(SNodeMsg *pMsg) {
  SMnode  *pMnode = pMsg->pNode;
S
shm  
Shengliang Guan 已提交
390 391 392 393
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  tmsg_t   msgType = pMsg->rpcMsg.msgType;
  void    *ahandle = pMsg->rpcMsg.ahandle;
  bool     isReq = (pRpc->msgType & 1U);
394

S
Shengliang Guan 已提交
395
  mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
S
Shengliang Guan 已提交
396

S
Shengliang Guan 已提交
397
  if (isReq && !mndIsMaster(pMnode)) {
S
shm  
Shengliang Guan 已提交
398
    terrno = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
399
    mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
400
    return -1;
S
Shengliang Guan 已提交
401 402
  }

S
shm  
Shengliang Guan 已提交
403 404
  if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) {
    terrno = TSDB_CODE_MND_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
405
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
shm  
Shengliang Guan 已提交
406
    return -1;
S
Shengliang Guan 已提交
407 408
  }

409
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
410
  if (fp == NULL) {
S
shm  
Shengliang Guan 已提交
411
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
412
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
shm  
Shengliang Guan 已提交
413
    return -1;
S
Shengliang Guan 已提交
414 415
  }

S
shm  
Shengliang Guan 已提交
416
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
417
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
shm  
Shengliang Guan 已提交
418
    terrno = code;
S
Shengliang Guan 已提交
419
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
420
  } else if (code != 0) {
S
Shengliang Guan 已提交
421
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
422
  } else {
S
Shengliang Guan 已提交
423
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
424 425
  }

S
shm  
Shengliang Guan 已提交
426
  return code;
S
Shengliang Guan 已提交
427 428
}

429 430 431 432
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
433 434 435
  }
}

D
dapan1121 已提交
436
// Note: uid 0 is reserved
S
Shengliang Guan 已提交
437
uint64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
438
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
439 440 441 442 443 444 445 446 447

  do {
    int64_t  us = taosGetTimestampUs();
    uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    uint64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
    if (uuid) {
      return uuid;
    }
  } while (true);
L
Liu Jicong 已提交
448
}
S
Shengliang Guan 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477

void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
  memset(pLoad, 0, sizeof(SMnodeLoad));

  SSdb *pSdb = pMnode->pSdb;
  pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE);
  pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE);
  pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP);
  pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB);
  pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB);

  void *pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pLoad->numOfChildTable += pVgroup->numOfTables;
    pLoad->numOfColumn += pVgroup->numOfTimeSeries;
    pLoad->totalPoints += pVgroup->pointsWritten;
    pLoad->totalStorage += pVgroup->totalStorage;
    pLoad->compStorage += pVgroup->compStorage;

    sdbRelease(pSdb, pVgroup);
  }
}

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
478
  if (!mndIsMaster(pMnode)) return -1;
S
Shengliang Guan 已提交
479

S
Shengliang Guan 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
    tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role));
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);

    if (pObj->role == TAOS_SYNC_STATE_LEADER) {
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
      pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
    }
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
    strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
      tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role));
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
      if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) {
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
L
Liu Jicong 已提交
576
}