mndMain.c 24.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60 61 62 63
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67 68
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
69
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
70 71
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
72

S
Shengliang Guan 已提交
73 74 75 76
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
77
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
81
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
82 83 84 85 86 87
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
88
    if (mndGetStop(pMnode)) break;
S
Shengliang Guan 已提交
89 90 91 92 93 94 95 96 97 98 99 100

    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  return NULL;
L
Liu Jicong 已提交
104 105
}

106
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
107 108 109 110 111
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
112 113
    return -1;
  }
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
117 118 119
  return 0;
}

120
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
121 122
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
123
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
128 129 130
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
131
    return -1;
132 133 134 135
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
136
    return -1;
137
  }
138 139

  return 0;
140
}
S
Shengliang Guan 已提交
141

142 143 144
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
145
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
146

S
Shengliang Guan 已提交
147
  pMnode->pSdb = sdbInit(&opt);
148
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
149 150 151 152 153 154
    return -1;
  }

  return 0;
}

155 156 157 158 159 160 161 162
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    // return sdbDeploy(pMnode->pSdb);;
    return 0;
  }
}
163 164 165

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
166
    sdbCleanup(pMnode->pSdb);
167 168 169 170
    pMnode->pSdb = NULL;
  }
}

171 172 173 174 175
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
176
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
177 178 179 180
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
181 182 183
  return 0;
}

184
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
185 186 187 188
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
189 190 191
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
192
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
193
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
194
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
195 196
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
197
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
198
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
199 200
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
201
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
202
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
203
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
S
sma  
Shengliang Guan 已提交
204
  if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
205
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
206
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
207
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
208
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
209
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
210 211
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
212
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
213 214
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
215 216 217 218

  return 0;
}

219
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
220 221
  if (pMnode->pSteps == NULL) return;

222
  if (pos == -1) {
223
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
224 225
  }

226
  for (int32_t s = pos; s >= 0; s--) {
227
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
228
    mDebug("%s will cleanup", pStep->name);
229 230 231
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
232 233
  }

S
Shengliang Guan 已提交
234
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
235
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
236
  pMnode->pSteps = NULL;
237
}
S
Shengliang Guan 已提交
238

239
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
240
  int32_t size = taosArrayGetSize(pMnode->pSteps);
241
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
242
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
243
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
246
      int32_t code = terrno;
S
Shengliang Guan 已提交
247
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
248
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
249
      terrno = code;
S
Shengliang Guan 已提交
250
      return -1;
S
Shengliang Guan 已提交
251
    } else {
S
Shengliang Guan 已提交
252
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
253
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
254 255
    }
  }
S
Shengliang Guan 已提交
256

S
shm  
Shengliang Guan 已提交
257
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
258
  return 0;
259
}
S
Shengliang Guan 已提交
260

S
shm  
Shengliang Guan 已提交
261
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
262 263 264
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
265
  pMnode->msgCb = pOption->msgCb;
266
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
267
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
268
}
269

270
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
271 272
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
273
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
274 275 276 277 278 279
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
280 281
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
282
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
283

284
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
285 286
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
287
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
288 289 290 291
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
292

S
Shengliang Guan 已提交
293
  int32_t code = mndCreateDir(pMnode, path);
294
  if (code != 0) {
S
Shengliang Guan 已提交
295 296
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
297 298 299 300 301
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

302
  code = mndInitSteps(pMnode);
303
  if (code != 0) {
S
Shengliang Guan 已提交
304 305
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
306 307 308 309 310 311 312
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
313 314
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
315 316 317 318
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
319

S
Shengliang Guan 已提交
320
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
321 322
  return pMnode;
}
S
Shengliang Guan 已提交
323

324
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
325 326 327
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
328 329
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
330 331
    mDebug("mnode is closed");
  }
332
}
S
Shengliang Guan 已提交
333

334
int32_t mndStart(SMnode *pMnode) {
335
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
336
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
337 338 339 340 341
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
342
  }
M
Minghao Li 已提交
343 344 345
  return mndInitTimer(pMnode);
}

346
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
347
  mndSetStop(pMnode);
348
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
349
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
350 351
}

M
Minghao Li 已提交
352
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
353 354 355
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  int32_t    code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
M
Minghao Li 已提交
356

357 358 359 360
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
361

362 363 364 365 366
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
M
Minghao Li 已提交
367

S
Shengliang Guan 已提交
368 369 370 371
  if (mndAcquireSyncRef(pMnode) != 0) {
    mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
    return TAOS_SYNC_PROPOSE_OTHER_ERROR;
  }
372

C
Cary Xu 已提交
373
  char  logBuf[512] = {0};
374 375 376 377 378
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
  snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

379
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
  if (syncNodeSnapshotEnable(pSyncNode)) {
    if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);

    } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
414

415 416 417 418 419 420 421 422
    } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) {
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) {
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
M
Minghao Li 已提交
423

M
Minghao Li 已提交
424 425 426 427 428
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
      code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
    }

M
Minghao Li 已提交
429
  } else {
M
Minghao Li 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
    } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
      code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
    }
M
Minghao Li 已提交
466 467
  }

S
Shengliang Guan 已提交
468
  mndReleaseSyncRef(pMnode);
469
  return code;
M
Minghao Li 已提交
470 471
}

S
Shengliang Guan 已提交
472 473
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
474

S
Shengliang Guan 已提交
475 476 477 478
  if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
      pMsg->msgType != TDMT_MND_TRANS_TIMER) {
    mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
479

S
Shengliang Guan 已提交
480 481
    SEpSet epSet = {0};
    mndGetMnodeEpSet(pMsg->info.node, &epSet);
482

S
Shengliang Guan 已提交
483 484 485 486 487 488 489 490
    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
491
    }
S
Shengliang Guan 已提交
492
  }
493 494 495 496

  return -1;
}

S
Shengliang Guan 已提交
497
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
498 499 500
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;

S
Shengliang Guan 已提交
501
  mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
502 503 504 505
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
506
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
507
  SMnode  *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
508
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
509
  if (fp == NULL) {
510
    mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
511 512
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
513 514
  }

S
Shengliang Guan 已提交
515 516 517 518
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

  mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
519
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
520 521
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
522
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
523 524 525
    mTrace("msg:%p, won't response immediately since in progress", pMsg);
  } else if (code == 0) {
    mTrace("msg:%p, successfully processed and response", pMsg);
526
  } else {
527 528
    mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
529
  }
S
Shengliang Guan 已提交
530

S
shm  
Shengliang Guan 已提交
531
  return code;
S
Shengliang Guan 已提交
532 533
}

534 535 536 537
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
538 539 540
  }
}

D
dapan1121 已提交
541
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
542
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
543
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
544 545

  do {
L
Liu Jicong 已提交
546
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
547 548
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
549
    if (uuid) {
L
Liu Jicong 已提交
550
      return llabs(uuid);
D
dapan1121 已提交
551 552
    }
  } while (true);
L
Liu Jicong 已提交
553
}
S
Shengliang Guan 已提交
554 555 556

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
557
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
558

S
Shengliang Guan 已提交
559 560 561 562 563 564 565
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
S
Shengliang Guan 已提交
566
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
    if (mndIsDnodeOnline(pMnode, pObj, ms)) {
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

603
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
604 605
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
606 607 608 609
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
610
    }
611 612
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
626 627 628 629 630

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
631 632 633 634 635 636 637
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
638
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
639 640 641 642
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
643
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
661
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
662
  return 0;
L
Liu Jicong 已提交
663
}
S
Shengliang Guan 已提交
664 665

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
666
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
667
  return 0;
L
fix  
Liu Jicong 已提交
668
}
S
Shengliang Guan 已提交
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
    mTrace("mnode rpc is acquired, ref:%d", ref);
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
  mTrace("mnode rpc is released, ref:%d", ref);
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }

int32_t mndAcquireSyncRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1);
    mTrace("mnode sync is acquired, ref:%d", ref);
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseSyncRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
  mTrace("mnode sync is released, ref:%d", ref);
  taosThreadRwlockUnlock(&pMnode->lock);
741
}