mndMain.c 20.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60 61 62 63
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67 68
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
69
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
70 71
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
72

S
Shengliang Guan 已提交
73 74 75 76
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
77
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
81
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMnode->stopped) break;

    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  return NULL;
L
Liu Jicong 已提交
104 105
}

106
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
107 108 109 110 111
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
112 113
    return -1;
  }
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
117 118 119
  return 0;
}

120
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
121 122 123
  pMnode->stopped = true;
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
124
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
125 126 127
  }
}

S
Shengliang Guan 已提交
128
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
129 130 131
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
132
    return -1;
133 134 135 136
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
137
    return -1;
138
  }
139 140

  return 0;
141
}
S
Shengliang Guan 已提交
142

143 144 145
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
146
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  pMnode->pSdb = sdbInit(&opt);
149
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
150 151 152 153 154 155
    return -1;
  }

  return 0;
}

156 157 158 159 160 161 162 163
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    // return sdbDeploy(pMnode->pSdb);;
    return 0;
  }
}
164 165 166

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
167
    sdbCleanup(pMnode->pSdb);
168 169 170 171
    pMnode->pSdb = NULL;
  }
}

172 173 174 175 176
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
177
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
178 179 180 181
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
182 183 184
  return 0;
}

185
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
186 187 188 189
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
190 191 192
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
193
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
194
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
195
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
196 197
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
198
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
199
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
200 201
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
202
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
203
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
204
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
sma  
Shengliang Guan 已提交
205
  if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
206
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
207
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
208
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
209
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
210
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
211 212
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
213
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
214 215
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
216 217 218 219

  return 0;
}

220
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
221 222
  if (pMnode->pSteps == NULL) return;

223
  if (pos == -1) {
224
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
225 226
  }

227
  for (int32_t s = pos; s >= 0; s--) {
228
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
229
    mDebug("%s will cleanup", pStep->name);
230 231 232
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
233 234
  }

S
Shengliang Guan 已提交
235
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
236
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
237
  pMnode->pSteps = NULL;
238
}
S
Shengliang Guan 已提交
239

240
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
241
  int32_t size = taosArrayGetSize(pMnode->pSteps);
242
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
243
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
244
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
245

S
Shengliang Guan 已提交
246
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
247
      int32_t code = terrno;
S
Shengliang Guan 已提交
248
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
249
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
250
      terrno = code;
S
Shengliang Guan 已提交
251
      return -1;
S
Shengliang Guan 已提交
252
    } else {
S
Shengliang Guan 已提交
253
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
254
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
255 256
    }
  }
S
Shengliang Guan 已提交
257

S
shm  
Shengliang Guan 已提交
258
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
259
  return 0;
260
}
S
Shengliang Guan 已提交
261

S
shm  
Shengliang Guan 已提交
262
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
263 264 265
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
266
  pMnode->msgCb = pOption->msgCb;
267
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
268
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
269
}
270

271
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
272 273
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
274
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
275 276 277 278 279 280
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
281 282
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
283
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
284

285
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
286 287
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
288
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
289 290 291 292
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
293

S
Shengliang Guan 已提交
294
  int32_t code = mndCreateDir(pMnode, path);
295
  if (code != 0) {
S
Shengliang Guan 已提交
296 297
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
298 299 300 301 302
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

303
  code = mndInitSteps(pMnode);
304
  if (code != 0) {
S
Shengliang Guan 已提交
305 306
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
307 308 309 310 311 312 313
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
314 315
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
316 317 318 319
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
320

S
Shengliang Guan 已提交
321
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
322 323
  return pMnode;
}
S
Shengliang Guan 已提交
324

325
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
326 327 328
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
329 330
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
331 332
    mDebug("mnode is closed");
  }
333
}
S
Shengliang Guan 已提交
334

335
int32_t mndStart(SMnode *pMnode) {
336
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
337 338
  if (pMnode->deploy) {
    if (sdbDeploy(pMnode->pSdb) != 0) return -1;
S
Shengliang Guan 已提交
339
    pMnode->restored = true;
340
  }
M
Minghao Li 已提交
341 342 343
  return mndInitTimer(pMnode);
}

344 345
void mndStop(SMnode *pMnode) {
  mndSyncStop(pMnode);
346
  return mndCleanupTimer(pMnode);
M
Minghao Li 已提交
347 348
}

M
Minghao Li 已提交
349
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
350 351 352
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
353

354 355 356 357
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
358

359 360 361 362 363
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
364

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
  char  logBuf[512];
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
  snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

  if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
    syncTimeoutDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
    syncClientRequestDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
403
  } else {
404 405
    mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
406 407
  }

408
  return code;
M
Minghao Li 已提交
409 410
}

411 412 413
static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) {
  if (!IsReq(pMsg)) return 0;
  if (mndIsMaster(pMsg->info.node)) return 0;
S
Shengliang Guan 已提交
414

415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
      pMsg->msgType == TDMT_MND_TRANS_TIMER) {
    return -1;
  }
  mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
         TMSG_INFO(pMsg->msgType));

  SEpSet epSet = {0};
  mndGetMnodeEpSet(pMsg->info.node, &epSet);

#if 0
  mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
    mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
    if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
      epSet.inUse = (i + 1) % epSet.numOfEps;
S
Shengliang Guan 已提交
431
    }
S
Shengliang Guan 已提交
432
  }
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
#endif  

  int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
  pMsg->info.rsp = rpcMallocCont(contLen);
  if (pMsg->info.rsp != NULL) {
    tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
    pMsg->info.rspLen = contLen;
    terrno = TSDB_CODE_RPC_REDIRECT;
  } else {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }

  return -1;
}

static int32_t mndCheckRequestValid(SRpcMsg *pMsg) {
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;

  mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

int32_t mndProcessMsg(SRpcMsg *pMsg) {
  if (mndCheckMnodeMaster(pMsg) != 0) return -1;
  if (mndCheckRequestValid(pMsg) != 0) return -1;
S
Shengliang Guan 已提交
460

461
  SMnode  *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
462
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
463
  if (fp == NULL) {
464
    mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
465 466
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
467 468
  }

469
  mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
470
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
471
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
472 473 474
    mTrace("msg:%p, won't response immediately since in progress", pMsg);
  } else if (code == 0) {
    mTrace("msg:%p, successfully processed and response", pMsg);
475
  } else {
476 477
    mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
478
  }
S
shm  
Shengliang Guan 已提交
479
  return code;
S
Shengliang Guan 已提交
480 481
}

482 483 484 485
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
486 487 488
  }
}

D
dapan1121 已提交
489
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
490
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
491
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
492 493

  do {
L
Liu Jicong 已提交
494
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
495 496
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
497
    if (uuid) {
L
Liu Jicong 已提交
498
      return llabs(uuid);
D
dapan1121 已提交
499 500
    }
  } while (true);
L
Liu Jicong 已提交
501
}
S
Shengliang Guan 已提交
502 503 504

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
505
  if (!mndIsMaster(pMnode)) return -1;
S
Shengliang Guan 已提交
506

S
Shengliang Guan 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

550
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
551 552
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
553 554 555 556
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
557
    }
558 559
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
560 561 562 563 564 565 566 567 568 569 570 571 572
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
573 574 575 576 577

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
578 579 580 581 582 583 584
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
585
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
586 587 588 589
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
590
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

  return 0;
L
Liu Jicong 已提交
609
}
S
Shengliang Guan 已提交
610 611

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
612
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
613
  return 0;
L
fix  
Liu Jicong 已提交
614
}