mnode.c 19.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60 61 62 63
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67 68
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
69
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
70 71
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
72

S
Shengliang Guan 已提交
73 74 75 76
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
77
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
81
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMnode->stopped) break;

    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  return NULL;
L
Liu Jicong 已提交
104 105
}

106
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
107 108 109 110 111
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
112 113
    return -1;
  }
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
117 118 119
  return 0;
}

120
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
121 122 123
  pMnode->stopped = true;
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
124
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
125 126 127
  }
}

S
Shengliang Guan 已提交
128
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
129 130 131
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
132
    return -1;
133 134 135 136
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
137
    return -1;
138
  }
139 140

  return 0;
141
}
S
Shengliang Guan 已提交
142

143 144 145
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
146
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  pMnode->pSdb = sdbInit(&opt);
149
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
150 151 152 153 154 155
    return -1;
  }

  return 0;
}

156 157 158 159 160 161 162 163
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    // return sdbDeploy(pMnode->pSdb);;
    return 0;
  }
}
164 165 166

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
167
    sdbCleanup(pMnode->pSdb);
168 169 170 171
    pMnode->pSdb = NULL;
  }
}

172 173 174 175 176
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
177
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
178 179 180 181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
182 183 184
  return 0;
}

185
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
186 187 188 189
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
190 191 192
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
193
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
194
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
195
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
196 197
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
198
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
199
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
200 201
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
202
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
203
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
204
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
sma  
Shengliang Guan 已提交
205
  if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
206
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
207
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
208
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
209
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
210
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
211 212
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
213
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
214 215
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
216 217 218 219

  return 0;
}

220
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
221 222
  if (pMnode->pSteps == NULL) return;

223
  if (pos == -1) {
224
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
225 226
  }

227
  for (int32_t s = pos; s >= 0; s--) {
228
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
229
    mDebug("%s will cleanup", pStep->name);
230 231 232
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
233 234
  }

S
Shengliang Guan 已提交
235
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
236
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
237
  pMnode->pSteps = NULL;
238
}
S
Shengliang Guan 已提交
239

240
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
241
  int32_t size = taosArrayGetSize(pMnode->pSteps);
242
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
243
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
244
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
245

S
Shengliang Guan 已提交
246
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
247
      int32_t code = terrno;
S
Shengliang Guan 已提交
248
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
249
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
250
      terrno = code;
S
Shengliang Guan 已提交
251
      return -1;
S
Shengliang Guan 已提交
252
    } else {
S
Shengliang Guan 已提交
253
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
254
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
255 256
    }
  }
S
Shengliang Guan 已提交
257

S
shm  
Shengliang Guan 已提交
258
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
259
  return 0;
260
}
S
Shengliang Guan 已提交
261

S
shm  
Shengliang Guan 已提交
262
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
263 264 265
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
266
  pMnode->msgCb = pOption->msgCb;
267
  pMnode->selfId = pOption->replicas[pOption->selfIndex].id;
S
Shengliang Guan 已提交
268
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
269
}
270

271
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
272 273
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
274
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
275 276 277 278 279 280
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
281 282
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
283
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
284

285
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
286 287
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
288
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
289 290 291 292
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
293

S
Shengliang Guan 已提交
294
  int32_t code = mndCreateDir(pMnode, path);
295
  if (code != 0) {
S
Shengliang Guan 已提交
296 297
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
298 299 300 301 302
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

303
  code = mndInitSteps(pMnode);
304
  if (code != 0) {
S
Shengliang Guan 已提交
305 306
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
307 308 309 310 311 312 313
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
314 315
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
316 317 318 319
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
320

S
Shengliang Guan 已提交
321
  mndUpdateMnodeRole(pMnode);
S
Shengliang Guan 已提交
322
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
323 324
  return pMnode;
}
S
Shengliang Guan 已提交
325

326
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
327 328 329
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
330 331
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
332 333
    mDebug("mnode is closed");
  }
334
}
S
Shengliang Guan 已提交
335

336
int32_t mndStart(SMnode *pMnode) {
337
  mndSyncStart(pMnode);
338 339 340
  if (pMnode->deploy && sdbDeploy(pMnode->pSdb) != 0) {
    return -1;
  }
M
Minghao Li 已提交
341 342 343
  return mndInitTimer(pMnode);
}

344 345
void mndStop(SMnode *pMnode) {
  mndSyncStop(pMnode);
346
  return mndCleanupTimer(pMnode);
M
Minghao Li 已提交
347 348
}

M
Minghao Li 已提交
349
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
350 351 352
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
353

354 355 356 357
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
358

359 360 361 362 363
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
364

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
  char  logBuf[512];
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
  snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

  if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
    syncTimeoutDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
    syncClientRequestDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
403
  } else {
404 405
    mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
406 407
  }

408
  return code;
M
Minghao Li 已提交
409 410
}

S
Shengliang Guan 已提交
411 412 413 414
int32_t mndProcessMsg(SRpcMsg *pMsg) {
  SMnode *pMnode = pMsg->info.node;
  void   *ahandle = pMsg->info.ahandle;
  mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
S
Shengliang Guan 已提交
415

S
Shengliang Guan 已提交
416
  if (IsReq(pMsg)) {
417 418
    if (!mndIsMaster(pMnode) && pMsg->msgType != TDMT_MND_TRANS_TIMER && pMsg->msgType != TDMT_MND_MQ_TIMER &&
        pMsg->msgType != TDMT_MND_TELEM_TIMER) {
S
Shengliang Guan 已提交
419 420 421 422
      terrno = TSDB_CODE_APP_NOT_READY;
      mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
      return -1;
    }
S
Shengliang Guan 已提交
423

S
Shengliang Guan 已提交
424 425 426 427 428
    if (pMsg->contLen == 0 || pMsg->pCont == NULL) {
      terrno = TSDB_CODE_INVALID_MSG_LEN;
      mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
      return -1;
    }
S
Shengliang Guan 已提交
429 430
  }

S
Shengliang Guan 已提交
431
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
432
  if (fp == NULL) {
S
shm  
Shengliang Guan 已提交
433
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
434
    mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
S
shm  
Shengliang Guan 已提交
435
    return -1;
S
Shengliang Guan 已提交
436 437
  }

S
shm  
Shengliang Guan 已提交
438
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
439
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
S
shm  
Shengliang Guan 已提交
440
    terrno = code;
S
Shengliang Guan 已提交
441
    mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
442
  } else if (code != 0) {
443 444 445 446 447
    if (terrno != TSDB_CODE_OPS_NOT_SUPPORT) {
      mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
    } else {
      mTrace("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
    }
448
  } else {
S
Shengliang Guan 已提交
449
    mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
S
Shengliang Guan 已提交
450 451
  }

S
shm  
Shengliang Guan 已提交
452
  return code;
S
Shengliang Guan 已提交
453 454
}

455 456 457 458
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
459 460 461
  }
}

D
dapan1121 已提交
462
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
463
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
464
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
465 466

  do {
L
Liu Jicong 已提交
467
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
468 469
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
470
    if (uuid) {
L
Liu Jicong 已提交
471
      return llabs(uuid);
D
dapan1121 已提交
472 473
    }
  } while (true);
L
Liu Jicong 已提交
474
}
S
Shengliang Guan 已提交
475 476 477

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
478
  if (!mndIsMaster(pMnode)) return -1;
S
Shengliang Guan 已提交
479

S
Shengliang Guan 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
S
Shengliang Guan 已提交
522
    tstrncpy(desc.role, syncStr(pObj->role), sizeof(desc.role));
S
Shengliang Guan 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);

    if (pObj->role == TAOS_SYNC_STATE_LEADER) {
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
      pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
    }
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
544 545 546 547 548

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
549 550 551 552 553 554 555
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
556
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
557 558 559 560
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
561
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
L
Liu Jicong 已提交
580
}
S
Shengliang Guan 已提交
581 582 583 584

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
  pLoad->syncState = pMnode->syncMgmt.state;
  return 0;
L
fix  
Liu Jicong 已提交
585
}