mnode.c 16.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24 25
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndMnode.h"
L
Liu Jicong 已提交
26
#include "mndOffset.h"
S
Shengliang Guan 已提交
27
#include "mndProfile.h"
S
Shengliang Guan 已提交
28
#include "mndQnode.h"
S
Shengliang Guan 已提交
29
#include "mndShow.h"
S
Shengliang Guan 已提交
30
#include "mndSnode.h"
S
Shengliang Guan 已提交
31
#include "mndStb.h"
L
Liu Jicong 已提交
32
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
33 34
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
35
#include "mndTopic.h"
S
Shengliang Guan 已提交
36 37 38
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
39

S
Shengliang Guan 已提交
40 41 42 43
#define MQ_TIMER_MS    3000
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000

S
Shengliang Guan 已提交
44 45 46 47
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
48
  }
S
Shengliang Guan 已提交
49 50

  return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
S
Shengliang Guan 已提交
51
}
S
Shengliang Guan 已提交
52

S
Shengliang Guan 已提交
53 54 55 56
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
    terrno = TSDB_CODE_MND_NOT_READY;
    return -1;
57
  }
S
Shengliang Guan 已提交
58 59

  return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
S
Shengliang Guan 已提交
60
}
S
Shengliang Guan 已提交
61

S
Shengliang Guan 已提交
62 63 64
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) {
    (*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg);
65
  }
S
Shengliang Guan 已提交
66
}
S
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
81
static void mndPullupTrans(void *param, void *tmrId) {
S
Shengliang Guan 已提交
82 83
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
84 85
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
86
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
87
    pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
S
Shengliang Guan 已提交
88 89
  }

S
Shengliang Guan 已提交
90
  taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
S
Shengliang Guan 已提交
91 92
}

L
Liu Jicong 已提交
93 94
static void mndCalMqRebalance(void *param, void *tmrId) {
  SMnode *pMnode = param;
L
Liu Jicong 已提交
95
  if (mndIsMaster(pMnode)) {
S
Shengliang Guan 已提交
96 97 98
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
L
Liu Jicong 已提交
99
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
L
Liu Jicong 已提交
100 101
  }

S
Shengliang Guan 已提交
102 103 104
  taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
}

S
Shengliang Guan 已提交
105
static void mndPullupTelem(void *param, void *tmrId) {
S
Shengliang Guan 已提交
106 107 108 109 110 111 112 113
  SMnode *pMnode = param;
  if (mndIsMaster(pMnode)) {
    int32_t contLen = 0;
    void   *pReq = mndBuildTimerMsg(&contLen);
    SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
    pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
  }

S
Shengliang Guan 已提交
114
  taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
L
Liu Jicong 已提交
115 116
}

117
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
118
  pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
119
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
120 121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
122 123
  }

S
Shengliang Guan 已提交
124
  if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
S
Shengliang Guan 已提交
125
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
126 127 128
    return -1;
  }

S
Shengliang Guan 已提交
129
  if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
S
Shengliang Guan 已提交
130 131 132 133
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
134
  if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
L
Liu Jicong 已提交
135 136 137 138
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
139 140 141
  return 0;
}

142 143
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
S
Shengliang Guan 已提交
144 145
    taosTmrStop(pMnode->transTimer);
    pMnode->transTimer = NULL;
L
Liu Jicong 已提交
146 147
    taosTmrStop(pMnode->mqTimer);
    pMnode->mqTimer = NULL;
S
Shengliang Guan 已提交
148 149
    taosTmrStop(pMnode->telemTimer);
    pMnode->telemTimer = NULL;
150 151
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
152 153 154
  }
}

S
Shengliang Guan 已提交
155
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
156 157 158
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
159
    return -1;
160 161 162 163
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
164
    return -1;
165
  }
166 167

  return 0;
168
}
S
Shengliang Guan 已提交
169

170 171 172
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
173
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
174

S
Shengliang Guan 已提交
175
  pMnode->pSdb = sdbInit(&opt);
176
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
177 178 179 180 181 182
    return -1;
  }

  return 0;
}

183
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
184
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
185 186 187

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
188
    sdbCleanup(pMnode->pSdb);
189 190 191 192
    pMnode->pSdb = NULL;
  }
}

193 194 195 196 197
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
198
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
199 200 201 202
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
203 204 205
  return 0;
}

206
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
207 208 209 210
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
211 212 213
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
214
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
215
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
S
Shengliang Guan 已提交
216 217 218
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
219 220
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
221
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
222
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
223
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
Shengliang Guan 已提交
224
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
225
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
226
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
227 228 229 230 231 232 233 234 235 236
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
237 238 239 240

  return 0;
}

241
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
242 243
  if (pMnode->pSteps == NULL) return;

244
  if (pos == -1) {
245
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
246 247
  }

248
  for (int32_t s = pos; s >= 0; s--) {
249
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
250
    mDebug("%s will cleanup", pStep->name);
251 252 253
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
254 255
  }

S
Shengliang Guan 已提交
256
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
257
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
258
  pMnode->pSteps = NULL;
259
}
S
Shengliang Guan 已提交
260

261
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
262
  int32_t size = taosArrayGetSize(pMnode->pSteps);
263
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
264
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
265
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
266

S
Shengliang Guan 已提交
267
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
268
      int32_t code = terrno;
S
Shengliang Guan 已提交
269
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
270
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
271
      terrno = code;
S
Shengliang Guan 已提交
272
      return -1;
S
Shengliang Guan 已提交
273
    } else {
S
Shengliang Guan 已提交
274
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
275 276
    }
  }
S
Shengliang Guan 已提交
277 278

  return 0;
279
}
S
Shengliang Guan 已提交
280

281 282 283 284 285 286 287
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  pMnode->pDnode = pOption->pDnode;
S
Shengliang Guan 已提交
288
  pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
L
Liu Jicong 已提交
289
  pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp;
S
Shengliang Guan 已提交
290 291 292
  pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
  pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
  pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
S
Shengliang Guan 已提交
293 294 295 296 297 298 299 300 301
  pMnode->cfg.sver = pOption->cfg.sver;
  pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
  pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
  pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
  pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
  pMnode->cfg.locale = strdup(pOption->cfg.locale);
  pMnode->cfg.charset = strdup(pOption->cfg.charset);
  pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
  pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
302

S
Shengliang Guan 已提交
303 304 305
  if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
      pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
      pMnode->cfg.statusInterval < 1) {
S
Shengliang Guan 已提交
306
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
307 308 309
    return -1;
  }

S
Shengliang Guan 已提交
310
  if (pMnode->cfg.timezone == NULL || pMnode->cfg.locale == NULL || pMnode->cfg.charset == NULL) {
S
Shengliang Guan 已提交
311
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
312
    return -1;
313 314 315
  }

  return 0;
L
Liu Jicong 已提交
316
}
317

318
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
319 320
  mDebug("start to open mnode in %s", path);

321
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
322 323 324 325 326 327
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
328 329 330
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
331 332 333 334 335 336 337
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
338

S
Shengliang Guan 已提交
339
  int32_t code = mndCreateDir(pMnode, path);
340
  if (code != 0) {
S
Shengliang Guan 已提交
341 342
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
343 344 345 346 347 348
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
349
  if (code != 0) {
S
Shengliang Guan 已提交
350 351
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
352 353 354 355 356 357 358
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
359 360
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
361 362 363 364 365 366 367
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
368 369
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
370 371 372 373
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
374

S
Shengliang Guan 已提交
375
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
376 377
  return pMnode;
}
S
Shengliang Guan 已提交
378

379
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
380 381 382 383
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
S
Shengliang Guan 已提交
384 385 386 387 388
    tfree(pMnode->cfg.charset);
    tfree(pMnode->cfg.locale);
    tfree(pMnode->cfg.timezone);
    tfree(pMnode->cfg.gitinfo);
    tfree(pMnode->cfg.buildinfo);
S
Shengliang Guan 已提交
389 390 391
    tfree(pMnode);
    mDebug("mnode is closed");
  }
392
}
S
Shengliang Guan 已提交
393

394
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
395 396
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
397 398
  return 0;
}
S
Shengliang Guan 已提交
399

400
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
401
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
402
  taosRemoveDir(path);
S
Shengliang Guan 已提交
403
  mDebug("mnode is destroyed");
404
}
S
Shengliang Guan 已提交
405

406
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
S
Shengliang Guan 已提交
407 408 409 410 411 412 413 414 415 416 417
  pLoad->numOfDnode = 0;
  pLoad->numOfMnode = 0;
  pLoad->numOfVgroup = 0;
  pLoad->numOfDatabase = 0;
  pLoad->numOfSuperTable = 0;
  pLoad->numOfChildTable = 0;
  pLoad->numOfColumn = 0;
  pLoad->totalPoints = 0;
  pLoad->totalStorage = 0;
  pLoad->compStorage = 0;

418 419
  return 0;
}
S
Shengliang Guan 已提交
420

421
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
422 423 424
  SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
425
    mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
S
Shengliang Guan 已提交
426 427 428
    return NULL;
  }

S
Shengliang Guan 已提交
429 430
  if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER &&
      pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) {
431 432 433 434 435 436 437 438
    SRpcConnInfo connInfo = {0};
    if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
      taosFreeQitem(pMsg);
      terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
      mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
      return NULL;
    }
    memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
439 440
  }

S
Shengliang Guan 已提交
441
  pMsg->pMnode = pMnode;
S
Shengliang Guan 已提交
442 443 444
  pMsg->rpcMsg = *pRpcMsg;
  pMsg->createdTime = taosGetTimestampSec();

S
Shengliang Guan 已提交
445
  mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user);
S
Shengliang Guan 已提交
446 447 448
  return pMsg;
}

449
void mndCleanupMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
450
  mTrace("msg:%p, is destroyed, app:%p RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
S
Shengliang Guan 已提交
451 452
  rpcFreeCont(pMsg->rpcMsg.pCont);
  pMsg->rpcMsg.pCont = NULL;
S
Shengliang Guan 已提交
453 454 455
  taosFreeQitem(pMsg);
}

456 457 458 459
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
  SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
  rpcSendResponse(&rpcRsp);
}
S
Shengliang Guan 已提交
460

S
Shengliang Guan 已提交
461
void mndProcessMsg(SMnodeMsg *pMsg) {
462
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
463
  int32_t code = 0;
464
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
S
Shengliang Guan 已提交
465
  void   *ahandle = pMsg->rpcMsg.ahandle;
S
Shengliang Guan 已提交
466
  bool    isReq = (msgType & 1U);
467

S
Shengliang Guan 已提交
468
  mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
S
Shengliang Guan 已提交
469

S
Shengliang Guan 已提交
470 471
  if (isReq && !mndIsMaster(pMnode)) {
    code = TSDB_CODE_APP_NOT_READY;
S
Shengliang Guan 已提交
472
    mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
473 474 475 476 477
    goto PROCESS_RPC_END;
  }

  if (isReq && pMsg->rpcMsg.pCont == NULL) {
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
S
Shengliang Guan 已提交
478
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
479
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
480 481
  }

482
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
S
Shengliang Guan 已提交
483
  if (fp == NULL) {
S
Shengliang Guan 已提交
484
    code = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
485
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
486
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
487 488
  }

489
  code = (*fp)(pMsg);
S
Shengliang Guan 已提交
490
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
491
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
492 493
    return;
  } else if (code != 0) {
S
Shengliang Guan 已提交
494
    code = terrno;
S
Shengliang Guan 已提交
495
    mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
S
Shengliang Guan 已提交
496
    goto PROCESS_RPC_END;
497
  } else {
S
Shengliang Guan 已提交
498
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
499 500 501 502
  }

PROCESS_RPC_END:
  if (isReq) {
S
Shengliang Guan 已提交
503 504
    if (pMsg->rpcMsg.handle == NULL) return;

S
Shengliang Guan 已提交
505
    if (code == TSDB_CODE_APP_NOT_READY) {
S
Shengliang Guan 已提交
506
      mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
S
Shengliang Guan 已提交
507
    } else if (code != 0) {
S
Shengliang Guan 已提交
508 509
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
510
    } else {
S
Shengliang Guan 已提交
511 512
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
513
    }
S
Shengliang Guan 已提交
514
  }
S
Shengliang Guan 已提交
515 516
}

517 518 519 520
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
521 522 523
  }
}

S
Shengliang Guan 已提交
524 525 526 527 528
uint64_t mndGenerateUid(char *name, int32_t len) {
  int64_t  us = taosGetTimestampUs();
  int32_t  hashval = MurmurHash3_32(name, len);
  uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
  return x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
L
Liu Jicong 已提交
529
}