mndMain.c 22.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60 61 62 63
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67 68
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
69
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
70 71
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
72

S
Shengliang Guan 已提交
73 74 75 76
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
77
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
81
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
82 83 84 85 86 87
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
88
    if (mndGetStop(pMnode)) break;
S
Shengliang Guan 已提交
89 90 91 92 93 94 95 96 97 98 99 100

    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  return NULL;
L
Liu Jicong 已提交
104 105
}

106
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
107 108 109 110 111
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
112 113
    return -1;
  }
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
117 118 119
  return 0;
}

120
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
121 122
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
123
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
128 129 130
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
131
    return -1;
132 133 134 135
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
136
    return -1;
137
  }
138 139

  return 0;
140
}
S
Shengliang Guan 已提交
141

142 143 144
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
145
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
146

S
Shengliang Guan 已提交
147
  pMnode->pSdb = sdbInit(&opt);
148
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
149 150 151 152 153 154
    return -1;
  }

  return 0;
}

155 156 157 158 159 160 161 162
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    // return sdbDeploy(pMnode->pSdb);;
    return 0;
  }
}
163 164 165

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
166
    sdbCleanup(pMnode->pSdb);
167 168 169 170
    pMnode->pSdb = NULL;
  }
}

171 172 173 174 175
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
176
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
177 178 179 180
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
181 182 183
  return 0;
}

184
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
185 186 187 188
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
189 190 191
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
192
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
193
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
194
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
195 196
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
197
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
198
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
199 200
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
201
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
202
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
203
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
sma  
Shengliang Guan 已提交
204
  if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
205
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
206
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
207
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
208
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
209
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
210 211
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
212
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
213 214
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
215 216 217 218

  return 0;
}

219
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
220 221
  if (pMnode->pSteps == NULL) return;

222
  if (pos == -1) {
223
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
224 225
  }

226
  for (int32_t s = pos; s >= 0; s--) {
227
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
228
    mDebug("%s will cleanup", pStep->name);
229 230 231
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
232 233
  }

S
Shengliang Guan 已提交
234
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
235
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
236
  pMnode->pSteps = NULL;
237
}
S
Shengliang Guan 已提交
238

239
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
240
  int32_t size = taosArrayGetSize(pMnode->pSteps);
241
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
242
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
243
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
246
      int32_t code = terrno;
S
Shengliang Guan 已提交
247
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
248
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
249
      terrno = code;
S
Shengliang Guan 已提交
250
      return -1;
S
Shengliang Guan 已提交
251
    } else {
S
Shengliang Guan 已提交
252
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
253
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
254 255
    }
  }
S
Shengliang Guan 已提交
256

S
shm  
Shengliang Guan 已提交
257
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
258
  return 0;
259
}
S
Shengliang Guan 已提交
260

S
shm  
Shengliang Guan 已提交
261
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
262 263 264
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
265
  pMnode->msgCb = pOption->msgCb;
266
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
267
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
268
}
269

270
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
271 272
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
273
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
274 275 276 277 278 279
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
280 281
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
282
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
283

284
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
285 286
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
287
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
288 289 290 291
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
292

S
Shengliang Guan 已提交
293
  int32_t code = mndCreateDir(pMnode, path);
294
  if (code != 0) {
S
Shengliang Guan 已提交
295 296
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
297 298 299 300 301
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

302
  code = mndInitSteps(pMnode);
303
  if (code != 0) {
S
Shengliang Guan 已提交
304 305
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
306 307 308 309 310 311 312
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
313 314
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
315 316 317 318
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
319

S
Shengliang Guan 已提交
320
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
321 322
  return pMnode;
}
S
Shengliang Guan 已提交
323

324
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
325 326 327
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
328 329
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
330 331
    mDebug("mnode is closed");
  }
332
}
S
Shengliang Guan 已提交
333

334
int32_t mndStart(SMnode *pMnode) {
335
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
336
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
337 338 339 340 341
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
342
  }
M
Minghao Li 已提交
343 344 345
  return mndInitTimer(pMnode);
}

346
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
347
  mndSetStop(pMnode);
348
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
349
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
350 351
}

M
Minghao Li 已提交
352
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
353 354 355
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
356

357 358 359 360
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
361

362 363 364 365 366
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
367

S
Shengliang Guan 已提交
368 369 370 371 372
  if (mndAcquireSyncRef(pMnode) != 0) {
    mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }

373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
  char  logBuf[512];
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
  snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

  if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
    SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
    code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
    syncTimeoutDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
    SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
    code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
    syncPingDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
    SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
    code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
    syncPingReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
    SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
    code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
    syncClientRequestDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
    SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
    syncRequestVoteDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
    SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
    code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
    syncRequestVoteReplyDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
    SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
    syncAppendEntriesDestroy(pSyncMsg);
  } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
    SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
    code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
    syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
411
  } else {
412 413
    mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
414 415
  }

S
Shengliang Guan 已提交
416
  mndReleaseSyncRef(pMnode);
417
  return code;
M
Minghao Li 已提交
418 419
}

S
Shengliang Guan 已提交
420 421
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
422

S
Shengliang Guan 已提交
423 424 425 426
  if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
      pMsg->msgType != TDMT_MND_TRANS_TIMER) {
    mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
427

S
Shengliang Guan 已提交
428 429
    SEpSet epSet = {0};
    mndGetMnodeEpSet(pMsg->info.node, &epSet);
430

S
Shengliang Guan 已提交
431 432 433 434 435 436 437 438
    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
439
    }
S
Shengliang Guan 已提交
440
  }
441 442 443 444

  return -1;
}

S
Shengliang Guan 已提交
445
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
446 447 448
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;

S
Shengliang Guan 已提交
449
  mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
450 451 452 453
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
454
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
455
  SMnode  *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
456
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
457
  if (fp == NULL) {
458
    mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
459 460
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
461 462
  }

S
Shengliang Guan 已提交
463 464 465 466
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

  mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
467
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
468 469
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
470
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
471 472 473
    mTrace("msg:%p, won't response immediately since in progress", pMsg);
  } else if (code == 0) {
    mTrace("msg:%p, successfully processed and response", pMsg);
474
  } else {
475 476
    mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
477
  }
S
Shengliang Guan 已提交
478

S
shm  
Shengliang Guan 已提交
479
  return code;
S
Shengliang Guan 已提交
480 481
}

482 483 484 485
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
486 487 488
  }
}

D
dapan1121 已提交
489
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
490
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
491
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
492 493

  do {
L
Liu Jicong 已提交
494
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
495 496
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
497
    if (uuid) {
L
Liu Jicong 已提交
498
      return llabs(uuid);
D
dapan1121 已提交
499 500
    }
  } while (true);
L
Liu Jicong 已提交
501
}
S
Shengliang Guan 已提交
502 503 504

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
505
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
506

S
Shengliang Guan 已提交
507 508 509 510 511 512 513
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
S
Shengliang Guan 已提交
514
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

551
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
552 553
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
554 555 556 557
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
558
    }
559 560
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
561 562 563 564 565 566 567 568 569 570 571 572 573
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
574 575 576 577 578

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
579 580 581 582 583 584 585
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
586
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
587 588 589 590
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
591
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
609
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
610
  return 0;
L
Liu Jicong 已提交
611
}
S
Shengliang Guan 已提交
612 613

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
614
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
615
  return 0;
L
fix  
Liu Jicong 已提交
616
}
S
Shengliang Guan 已提交
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }

int32_t mndAcquireSyncRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1);
    mTrace("mnode sync is acquired, ref:%d", ref);
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseSyncRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
  mTrace("mnode sync is released, ref:%d", ref);
  taosThreadRwlockUnlock(&pMnode->lock);
}