mndMain.c 25.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60 61 62 63
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

S
Shengliang Guan 已提交
66 67 68
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
69
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
70 71
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
72

S
Shengliang Guan 已提交
73 74 75 76
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
  void   *pReq = mndBuildTimerMsg(&contLen);
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
77
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
78 79
}

S
Shengliang Guan 已提交
80
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
81
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
82 83 84 85 86 87
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
88
    if (mndGetStop(pMnode)) break;
S
Shengliang Guan 已提交
89 90 91 92 93 94 95 96 97 98 99 100

    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  return NULL;
L
Liu Jicong 已提交
104 105
}

106
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
107 108 109 110 111
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
112 113
    return -1;
  }
L
Liu Jicong 已提交
114

S
Shengliang Guan 已提交
115 116
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
117 118 119
  return 0;
}

120
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
121 122
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
123
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
124 125 126
  }
}

S
Shengliang Guan 已提交
127
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
128 129 130
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
131
    return -1;
132 133 134 135
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
136
    return -1;
137
  }
138 139

  return 0;
140
}
S
Shengliang Guan 已提交
141

142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

171 172 173
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
174
  opt.pMnode = pMnode;
175
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
176

S
Shengliang Guan 已提交
177
  pMnode->pSdb = sdbInit(&opt);
178
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
179 180 181 182 183 184
    return -1;
  }

  return 0;
}

185 186 187 188 189 190 191
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
192 193 194

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
195
    sdbCleanup(pMnode->pSdb);
196 197 198 199
    pMnode->pSdb = NULL;
  }
}

200 201 202 203 204
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
205
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
206 207 208 209
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
210 211 212
  return 0;
}

213
static int32_t mndInitSteps(SMnode *pMnode) {
214
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
215 216 217 218
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
219
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
220 221
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
222
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
223
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
224
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
225 226
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
227
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
228
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
229 230
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
231
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
232
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
233
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
234
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
235
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
236
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
237
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
238
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
239
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
240 241
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
242
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
243 244
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
245 246 247 248

  return 0;
}

249
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
250 251
  if (pMnode->pSteps == NULL) return;

252
  if (pos == -1) {
253
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
254 255
  }

256
  for (int32_t s = pos; s >= 0; s--) {
257
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
258
    mDebug("%s will cleanup", pStep->name);
259 260 261
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
262 263
  }

S
Shengliang Guan 已提交
264
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
265
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
266
  pMnode->pSteps = NULL;
267
}
S
Shengliang Guan 已提交
268

269
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
270
  int32_t size = taosArrayGetSize(pMnode->pSteps);
271
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
272
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
273
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
274

S
Shengliang Guan 已提交
275
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
276
      int32_t code = terrno;
S
Shengliang Guan 已提交
277
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
278
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
279
      terrno = code;
S
Shengliang Guan 已提交
280
      return -1;
S
Shengliang Guan 已提交
281
    } else {
S
Shengliang Guan 已提交
282
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
283
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
284 285
    }
  }
S
Shengliang Guan 已提交
286

S
shm  
Shengliang Guan 已提交
287
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
288
  return 0;
289
}
S
Shengliang Guan 已提交
290

S
shm  
Shengliang Guan 已提交
291
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
292
  pMnode->msgCb = pOption->msgCb;
293
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
294
  pMnode->syncMgmt.replica = pOption->replica;
S
Shengliang Guan 已提交
295
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
296
}
297

298
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
299 300
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
301
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
302 303 304 305 306 307
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
308 309
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
310
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
311

312
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
313 314
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
315
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
316 317 318 319
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
320

S
Shengliang Guan 已提交
321
  int32_t code = mndCreateDir(pMnode, path);
322
  if (code != 0) {
S
Shengliang Guan 已提交
323 324
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
325 326 327 328 329
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

330
  code = mndInitSteps(pMnode);
331
  if (code != 0) {
S
Shengliang Guan 已提交
332 333
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
334 335 336 337 338 339 340
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
341 342
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
343 344 345 346
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
347

S
Shengliang Guan 已提交
348
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
349 350
  return pMnode;
}
S
Shengliang Guan 已提交
351

352
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
353 354 355
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
356 357
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
358 359
    mDebug("mnode is closed");
  }
360
}
S
Shengliang Guan 已提交
361

362
int32_t mndStart(SMnode *pMnode) {
363
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
364
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
365 366 367 368 369
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
370
  }
M
Minghao Li 已提交
371 372 373
  return mndInitTimer(pMnode);
}

374
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
375
  mndSetStop(pMnode);
376
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
377
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
378 379
}

M
Minghao Li 已提交
380
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
381 382
  SMnode    *pMnode = pMsg->info.node;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
383
  int32_t    code = 0;
M
Minghao Li 已提交
384

385 386
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
387 388
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
389
  }
M
Minghao Li 已提交
390

391 392 393
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
394 395
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
396
  }
M
Minghao Li 已提交
397

C
Cary Xu 已提交
398
  char  logBuf[512] = {0};
399
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
S
Shengliang Guan 已提交
400
  snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
M
Minghao Li 已提交
401 402 403 404
  static int64_t mndTick = 0;
  if (++mndTick % 10 == 1) {
    mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
  }
405 406 407
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

408
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
409
  if (syncNodeSnapshotEnable(pSyncNode)) {
410
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
411 412 413
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
414
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
415 416 417
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
418
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
419 420 421
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
422
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
423 424 425
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);
426
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
427 428 429
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
430
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
431 432 433
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
434
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
435 436 437
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
438
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
439 440 441
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
442
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
443 444 445
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
446
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
447 448 449
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
450
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
451 452 453
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
454 455
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
456
      code = -1;
M
Minghao Li 已提交
457
    }
M
Minghao Li 已提交
458
  } else {
459
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
460 461 462
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
463
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
464 465 466
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
467
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
468 469 470
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
471
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
472 473 474
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);
475
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
476 477 478
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
479
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
480 481 482
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
483
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
484 485 486
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
487
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
488 489 490
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
491
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
492 493 494
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
495 496
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
497
      code = -1;
M
Minghao Li 已提交
498
    }
M
Minghao Li 已提交
499 500
  }

M
Minghao Li 已提交
501 502 503
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
504
  return code;
M
Minghao Li 已提交
505 506
}

S
Shengliang Guan 已提交
507 508
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
509

S
Shengliang Guan 已提交
510 511 512 513
  if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
      pMsg->msgType != TDMT_MND_TRANS_TIMER) {
    mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
           TMSG_INFO(pMsg->msgType));
514

S
Shengliang Guan 已提交
515 516
    SEpSet epSet = {0};
    mndGetMnodeEpSet(pMsg->info.node, &epSet);
517

S
Shengliang Guan 已提交
518 519 520 521 522 523 524 525
    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
526
    }
S
Shengliang Guan 已提交
527
  }
528 529 530 531

  return -1;
}

S
Shengliang Guan 已提交
532
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
533 534 535
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;

S
Shengliang Guan 已提交
536
  mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
537 538 539 540
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
541
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
542
  SMnode  *pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
543
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
544
  if (fp == NULL) {
545
    mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
546 547
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
548 549
  }

S
Shengliang Guan 已提交
550 551 552 553
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

  mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
554
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
555 556
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
557
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
558 559 560
    mTrace("msg:%p, won't response immediately since in progress", pMsg);
  } else if (code == 0) {
    mTrace("msg:%p, successfully processed and response", pMsg);
561
  } else {
562
    mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
563
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
564
  }
S
Shengliang Guan 已提交
565

S
shm  
Shengliang Guan 已提交
566
  return code;
S
Shengliang Guan 已提交
567 568
}

569 570 571 572
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
573 574 575
  }
}

D
dapan1121 已提交
576
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
577
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
578
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
579 580

  do {
L
Liu Jicong 已提交
581
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
582 583
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
584
    if (uuid) {
L
Liu Jicong 已提交
585
      return llabs(uuid);
D
dapan1121 已提交
586 587
    }
  } while (true);
L
Liu Jicong 已提交
588
}
S
Shengliang Guan 已提交
589 590 591

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
592
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
593

S
Shengliang Guan 已提交
594 595 596 597 598 599 600
  SSdb   *pSdb = pMnode->pSdb;
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
S
Shengliang Guan 已提交
601
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
    return -1;
  }

  // cluster info
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
619
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

638
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
639 640
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
641 642 643 644
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
645
    }
646 647
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
648 649 650 651 652 653 654 655 656 657 658 659 660
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
661 662 663 664 665

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
666 667 668 669 670 671 672
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
      SVnodeGid     *pVgid = &pVgroup->vnodeGid[i];
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
673
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
674 675 676 677
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
678
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
696
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
697
  return 0;
L
Liu Jicong 已提交
698
}
S
Shengliang Guan 已提交
699 700

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
701
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
702
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
703
  return 0;
L
fix  
Liu Jicong 已提交
704
}
S
Shengliang Guan 已提交
705 706 707 708 709 710 711 712 713 714 715

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
716
    // mTrace("mnode rpc is acquired, ref:%d", ref);
S
Shengliang Guan 已提交
717 718 719 720 721 722 723 724
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
725
  // mTrace("mnode rpc is released, ref:%d", ref);
S
Shengliang Guan 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }