mnode.c 13.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20 21 22 23 24 25 26
#include "mndAcct.h"
#include "mndAuth.h"
#include "mndBalance.h"
#include "mndCluster.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndMnode.h"
#include "mndProfile.h"
#include "mndShow.h"
S
Shengliang Guan 已提交
27
#include "mndStb.h"
S
Shengliang Guan 已提交
28 29
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
30
#include "mndTopic.h"
S
Shengliang Guan 已提交
31 32 33
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
34

35 36 37 38
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
    (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
  }
S
Shengliang Guan 已提交
39
}
S
Shengliang Guan 已提交
40

41 42 43 44
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendMsgToMnodeFp != NULL) {
    (*pMnode->sendMsgToMnodeFp)(pMnode->pDnode, pMsg);
  }
S
Shengliang Guan 已提交
45
}
S
Shengliang Guan 已提交
46

47 48 49 50
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendRedirectMsgFp != NULL) {
    (*pMnode->sendRedirectMsgFp)(pMnode->pDnode, pMsg);
  }
S
Shengliang Guan 已提交
51
}
S
Shengliang Guan 已提交
52

53 54 55
static int32_t mndInitTimer(SMnode *pMnode) {
  if (pMnode->timer == NULL) {
    pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
S
Shengliang Guan 已提交
56 57
  }

58
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
59
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
60 61 62 63 64 65
    return -1;
  }

  return 0;
}

66 67 68 69
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
70 71 72
  }
}

S
Shengliang Guan 已提交
73
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
74 75 76
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
77
    return -1;
78 79 80 81
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
82
    return -1;
83
  }
84 85

  return 0;
86
}
S
Shengliang Guan 已提交
87

88 89 90
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
91
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
92

S
Shengliang Guan 已提交
93
  pMnode->pSdb = sdbInit(&opt);
94
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
95 96 97 98 99 100
    return -1;
  }

  return 0;
}

101
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
102
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
103 104 105

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
106
    sdbCleanup(pMnode->pSdb);
107 108 109 110
    pMnode->pSdb = NULL;
  }
}

111 112 113 114 115
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
116
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
117 118 119 120
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
121 122 123
  return 0;
}

124
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
125 126 127 128 129 130 131 132 133 134
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
135
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
L
Liu Jicong 已提交
136
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
S
Shengliang Guan 已提交
137
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
138
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
139 140 141 142 143 144 145 146 147 148 149
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
150 151 152 153

  return 0;
}

154
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
155 156
  if (pMnode->pSteps == NULL) return;

157
  if (pos == -1) {
158
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
159 160
  }

161
  for (int32_t s = pos; s >= 0; s--) {
162
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
163 164 165 166
    mDebug("step:%s will cleanup", pStep->name);
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
167 168
  }

S
Shengliang Guan 已提交
169
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
170
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
171
  pMnode->pSteps = NULL;
172
}
S
Shengliang Guan 已提交
173

174
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
175
  int32_t size = taosArrayGetSize(pMnode->pSteps);
176
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
177
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
178
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
179

180 181
    // (*pMnode->reportProgress)(pStep->name, "start initialize");

S
Shengliang Guan 已提交
182
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
183
      int32_t code = terrno;
S
Shengliang Guan 已提交
184
      mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
185
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
186
      terrno = code;
S
Shengliang Guan 已提交
187
      return -1;
S
Shengliang Guan 已提交
188
    } else {
189
      mDebug("step:%s is initialized", pStep->name);
S
Shengliang Guan 已提交
190
    }
191 192

    // (*pMnode->reportProgress)(pStep->name, "initialize completed");
S
Shengliang Guan 已提交
193
  }
S
Shengliang Guan 已提交
194 195

  return 0;
196
}
S
Shengliang Guan 已提交
197

198 199 200 201 202 203 204 205 206 207 208
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  pMnode->pDnode = pOption->pDnode;
  pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
  pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
  pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
  pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
S
Shengliang Guan 已提交
209 210 211 212 213 214 215 216 217
  pMnode->cfg.sver = pOption->cfg.sver;
  pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
  pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
  pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
  pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
  pMnode->cfg.locale = strdup(pOption->cfg.locale);
  pMnode->cfg.charset = strdup(pOption->cfg.charset);
  pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
  pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
218 219

  if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
S
Shengliang Guan 已提交
220
      pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
221
      pMnode->cfg.statusInterval < 1) {
S
Shengliang Guan 已提交
222
    terrno = TSDB_CODE_MND_INVALID_OPTIONS;
S
Shengliang Guan 已提交
223 224 225
    return -1;
  }

S
Shengliang Guan 已提交
226
  if (pMnode->cfg.timezone == NULL || pMnode->cfg.locale == NULL || pMnode->cfg.charset == NULL) {
S
Shengliang Guan 已提交
227
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
228
    return -1;
229 230 231
  }

  return 0;
L
Liu Jicong 已提交
232
}
233

234
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
235 236
  mDebug("start to open mnode in %s", path);

237
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
238 239 240 241 242 243
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
244 245 246
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);

S
Shengliang Guan 已提交
247 248 249 250 251 252 253
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
254

S
Shengliang Guan 已提交
255
  int32_t code = mndCreateDir(pMnode, path);
256
  if (code != 0) {
S
Shengliang Guan 已提交
257 258
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
259 260 261 262 263 264
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
265
  if (code != 0) {
S
Shengliang Guan 已提交
266 267
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
268 269 270 271 272 273 274
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
275 276
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
277 278 279 280 281 282 283
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
284 285
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
286 287 288 289
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
290

S
Shengliang Guan 已提交
291
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
292 293
  return pMnode;
}
S
Shengliang Guan 已提交
294

295
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
296 297 298 299
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
S
Shengliang Guan 已提交
300 301 302 303 304
    tfree(pMnode->cfg.charset);
    tfree(pMnode->cfg.locale);
    tfree(pMnode->cfg.timezone);
    tfree(pMnode->cfg.gitinfo);
    tfree(pMnode->cfg.buildinfo);
S
Shengliang Guan 已提交
305 306 307
    tfree(pMnode);
    mDebug("mnode is closed");
  }
308
}
S
Shengliang Guan 已提交
309

310
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
311 312
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
313 314
  return 0;
}
S
Shengliang Guan 已提交
315

316
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
317
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
318
  taosRemoveDir(path);
S
Shengliang Guan 已提交
319
  mDebug("mnode is destroyed");
320
}
S
Shengliang Guan 已提交
321

322
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
S
Shengliang Guan 已提交
323 324 325 326 327 328 329 330 331 332 333
  pLoad->numOfDnode = 0;
  pLoad->numOfMnode = 0;
  pLoad->numOfVgroup = 0;
  pLoad->numOfDatabase = 0;
  pLoad->numOfSuperTable = 0;
  pLoad->numOfChildTable = 0;
  pLoad->numOfColumn = 0;
  pLoad->totalPoints = 0;
  pLoad->totalStorage = 0;
  pLoad->compStorage = 0;

334 335
  return 0;
}
S
Shengliang Guan 已提交
336

337
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
338 339 340
  SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
341
    mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
S
Shengliang Guan 已提交
342 343 344
    return NULL;
  }

S
Shengliang Guan 已提交
345
  SRpcConnInfo connInfo = {0};
S
Shengliang Guan 已提交
346 347
  if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
348
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
S
Shengliang Guan 已提交
349
    mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
S
Shengliang Guan 已提交
350 351
    return NULL;
  }
S
Shengliang Guan 已提交
352
  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
Shengliang Guan 已提交
353

S
Shengliang Guan 已提交
354
  pMsg->pMnode = pMnode;
S
Shengliang Guan 已提交
355 356 357
  pMsg->rpcMsg = *pRpcMsg;
  pMsg->createdTime = taosGetTimestampSec();

S
Shengliang Guan 已提交
358
  mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle);
S
Shengliang Guan 已提交
359 360 361
  return pMsg;
}

362
void mndCleanupMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
363
  mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
S
Shengliang Guan 已提交
364 365
  rpcFreeCont(pMsg->rpcMsg.pCont);
  pMsg->rpcMsg.pCont = NULL;
S
Shengliang Guan 已提交
366 367 368
  taosFreeQitem(pMsg);
}

369 370 371 372
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
  SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
  rpcSendResponse(&rpcRsp);
}
S
Shengliang Guan 已提交
373

374 375
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
376 377 378
  int32_t code = 0;
  int32_t msgType = pMsg->rpcMsg.msgType;
  void   *ahandle = pMsg->rpcMsg.ahandle;
S
Shengliang Guan 已提交
379
  bool    isReq = (msgType & 1U);
380

S
Shengliang Guan 已提交
381
  mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);
S
Shengliang Guan 已提交
382

S
Shengliang Guan 已提交
383 384
  if (isReq && !mndIsMaster(pMnode)) {
    code = TSDB_CODE_APP_NOT_READY;
385
    mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
S
Shengliang Guan 已提交
386 387 388 389 390
    goto PROCESS_RPC_END;
  }

  if (isReq && pMsg->rpcMsg.pCont == NULL) {
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
391
    mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
S
Shengliang Guan 已提交
392
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
393 394
  }

395
  MndMsgFp fp = pMnode->msgFp[msgType];
S
Shengliang Guan 已提交
396
  if (fp == NULL) {
S
Shengliang Guan 已提交
397
    code = TSDB_CODE_MSG_NOT_PROCESSED;
398
    mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle);
S
Shengliang Guan 已提交
399
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
400 401
  }

402
  code = (*fp)(pMsg);
S
Shengliang Guan 已提交
403 404 405 406
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
    mTrace("msg:%p, app:%p in progressing", pMsg, ahandle);
    return;
  } else if (code != 0) {
S
Shengliang Guan 已提交
407
    code = terrno;
408
    mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
S
Shengliang Guan 已提交
409
    goto PROCESS_RPC_END;
410 411
  } else {
    mTrace("msg:%p, app:%p is processed", pMsg, ahandle);
S
Shengliang Guan 已提交
412 413 414 415 416 417 418
  }

PROCESS_RPC_END:
  if (isReq) {
    if (code == TSDB_CODE_APP_NOT_READY) {
      mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
    } else if (code != 0) {
S
Shengliang Guan 已提交
419 420
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
421
    } else {
S
Shengliang Guan 已提交
422 423
      SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
      rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
424
    }
S
Shengliang Guan 已提交
425
  }
S
Shengliang Guan 已提交
426 427
}

428
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
S
Shengliang Guan 已提交
429
  if (msgType >= 0 && msgType < TSDB_MSG_TYPE_MAX) {
430
    pMnode->msgFp[msgType] = fp;
S
Shengliang Guan 已提交
431 432 433
  }
}

434
void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
435

436
void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
437

438
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
439

440
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}
S
Shengliang Guan 已提交
441 442 443 444 445 446

uint64_t mndGenerateUid(char *name, int32_t len) {
  int64_t  us = taosGetTimestampUs();
  int32_t  hashval = MurmurHash3_32(name, len);
  uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
  return x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
L
Liu Jicong 已提交
447
}