mndMain.c 25.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18
#include "mndAcct.h"
#include "mndAuth.h"
S
Shengliang Guan 已提交
19
#include "mndBnode.h"
S
Shengliang Guan 已提交
20
#include "mndCluster.h"
L
Liu Jicong 已提交
21
#include "mndConsumer.h"
S
Shengliang Guan 已提交
22 23 24
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
L
Liu Jicong 已提交
25
#include "mndGrant.h"
S
monitor  
Shengliang Guan 已提交
26
#include "mndInfoSchema.h"
S
Shengliang Guan 已提交
27
#include "mndMnode.h"
L
Liu Jicong 已提交
28
#include "mndOffset.h"
L
Liu Jicong 已提交
29
#include "mndPerfSchema.h"
S
Shengliang Guan 已提交
30
#include "mndProfile.h"
S
Shengliang Guan 已提交
31
#include "mndQnode.h"
L
Liu Jicong 已提交
32
#include "mndQuery.h"
S
Shengliang Guan 已提交
33
#include "mndShow.h"
S
sma  
Shengliang Guan 已提交
34
#include "mndSma.h"
S
Shengliang Guan 已提交
35
#include "mndSnode.h"
S
Shengliang Guan 已提交
36
#include "mndStb.h"
L
Liu Jicong 已提交
37
#include "mndStream.h"
L
Liu Jicong 已提交
38
#include "mndSubscribe.h"
S
Shengliang Guan 已提交
39 40
#include "mndSync.h"
#include "mndTelem.h"
L
Liu Jicong 已提交
41
#include "mndTopic.h"
S
Shengliang Guan 已提交
42 43 44
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58
static void *mndBuildTimerMsg(int32_t *pContLen) {
  SMTimerReq timerReq = {0};

  int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
  if (contLen <= 0) return NULL;
  void *pReq = rpcMallocCont(contLen);
  if (pReq == NULL) return NULL;

  tSerializeSMTimerMsg(pReq, contLen, &timerReq);
  *pContLen = contLen;
  return pReq;
}

S
Shengliang Guan 已提交
59 60
static void mndPullupTrans(SMnode *pMnode) {
  int32_t contLen = 0;
dengyihao's avatar
dengyihao 已提交
61
  void *  pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
62 63
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
64 65
}

wmmhello's avatar
wmmhello 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
static void mndTtlTimer(SMnode *pMnode) {
  int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
  SMsgHead   *pHead = rpcMallocCont(contLen);
  if (pHead == NULL) {
    mError("ttl time malloc err. contLen:%d", contLen);
    return;
  }

  int32_t t = taosGetTimestampSec();
  *(int32_t*)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);

  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pHead, .contLen = contLen};
  tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}

S
Shengliang Guan 已提交
81 82
static void mndCalMqRebalance(SMnode *pMnode) {
  int32_t contLen = 0;
dengyihao's avatar
dengyihao 已提交
83
  void *  pReq = mndBuildTimerMsg(&contLen);
84
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
S
Shengliang Guan 已提交
85 86
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
L
Liu Jicong 已提交
87

S
Shengliang Guan 已提交
88 89
static void mndPullupTelem(SMnode *pMnode) {
  int32_t contLen = 0;
dengyihao's avatar
dengyihao 已提交
90
  void *  pReq = mndBuildTimerMsg(&contLen);
S
Shengliang Guan 已提交
91
  SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
92
  tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
S
Shengliang Guan 已提交
93 94
}

S
Shengliang Guan 已提交
95
static void *mndThreadFp(void *param) {
S
Shengliang Guan 已提交
96
  SMnode *pMnode = param;
S
Shengliang Guan 已提交
97 98 99 100 101 102
  int64_t lastTime = 0;
  setThreadName("mnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
S
Shengliang Guan 已提交
103
    if (mndGetStop(pMnode)) break;
S
Shengliang Guan 已提交
104

wmmhello's avatar
wmmhello 已提交
105 106 107 108
    if (lastTime % (864000) == 1) {   // sleep 1 day for ttl
      mndTtlTimer(pMnode);
    }

S
Shengliang Guan 已提交
109 110 111 112 113 114 115 116 117 118 119
    if (lastTime % (tsTransPullupInterval * 10) == 0) {
      mndPullupTrans(pMnode);
    }

    if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
      mndCalMqRebalance(pMnode);
    }

    if (lastTime % (tsTelemInterval * 10) == 0) {
      mndPullupTelem(pMnode);
    }
S
Shengliang Guan 已提交
120 121
  }

S
Shengliang Guan 已提交
122
  return NULL;
L
Liu Jicong 已提交
123 124
}

125
static int32_t mndInitTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
126 127 128 129 130
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
    mError("failed to create timer thread since %s", strerror(errno));
S
shm  
Shengliang Guan 已提交
131 132
    return -1;
  }
L
Liu Jicong 已提交
133

S
Shengliang Guan 已提交
134 135
  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("mnode-timer", "initialized");
S
Shengliang Guan 已提交
136 137 138
  return 0;
}

139
static void mndCleanupTimer(SMnode *pMnode) {
S
Shengliang Guan 已提交
140 141
  if (taosCheckPthreadValid(pMnode->thread)) {
    taosThreadJoin(pMnode->thread, NULL);
142
    taosThreadClear(&pMnode->thread);
S
Shengliang Guan 已提交
143 144 145
  }
}

S
Shengliang Guan 已提交
146
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
147 148 149
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
150
    return -1;
151 152 153 154
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
155
    return -1;
156
  }
157 158

  return 0;
159
}
S
Shengliang Guan 已提交
160

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
static int32_t mndInitWal(SMnode *pMnode) {
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMnode->pWal = walOpen(path, &cfg);
  if (pMnode->pWal == NULL) {
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

  return 0;
}

static void mndCloseWal(SMnode *pMnode) {
  if (pMnode->pWal != NULL) {
    walClose(pMnode->pWal);
    pMnode->pWal = NULL;
  }
}

190 191 192
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
193
  opt.pMnode = pMnode;
194
  opt.pWal = pMnode->pWal;
S
Shengliang Guan 已提交
195

S
Shengliang Guan 已提交
196
  pMnode->pSdb = sdbInit(&opt);
197
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
198 199 200 201 202 203
    return -1;
  }

  return 0;
}

204 205 206 207 208 209 210
static int32_t mndOpenSdb(SMnode *pMnode) {
  if (!pMnode->deploy) {
    return sdbReadFile(pMnode->pSdb);
  } else {
    return 0;
  }
}
211 212 213

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
214
    sdbCleanup(pMnode->pSdb);
215 216 217 218
    pMnode->pSdb = NULL;
  }
}

219 220 221 222 223
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
224
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
225 226 227 228
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
229 230 231
  return 0;
}

232
static int32_t mndInitSteps(SMnode *pMnode) {
233
  if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
S
Shengliang Guan 已提交
234 235 236 237
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
S
Shengliang Guan 已提交
238
  if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
239 240
  if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
S
Shengliang Guan 已提交
241
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
S
Shengliang Guan 已提交
242
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
wafwerar's avatar
wafwerar 已提交
243
  if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
S
Shengliang Guan 已提交
244 245
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
L
Liu Jicong 已提交
246
  if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
S
Shengliang Guan 已提交
247
  if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
L
Liu Jicong 已提交
248 249
  if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
L
Liu Jicong 已提交
250
  if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
S
Shengliang Guan 已提交
251
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
S
Shengliang Guan 已提交
252
  if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
253
  if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
D
dapan1121 已提交
254
  if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
D
dapan1121 已提交
255
  if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
S
Shengliang Guan 已提交
256
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
S
Shengliang Guan 已提交
257
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
258
  if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
S
Shengliang Guan 已提交
259 260
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
D
dapan1121 已提交
261
  if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
S
Shengliang Guan 已提交
262 263
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
S
Shengliang Guan 已提交
264 265 266 267

  return 0;
}

268
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
269 270
  if (pMnode->pSteps == NULL) return;

271
  if (pos == -1) {
272
    pos = taosArrayGetSize(pMnode->pSteps) - 1;
S
Shengliang Guan 已提交
273 274
  }

275
  for (int32_t s = pos; s >= 0; s--) {
276
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
S
Shengliang Guan 已提交
277
    mDebug("%s will cleanup", pStep->name);
278 279 280
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
281 282
  }

S
Shengliang Guan 已提交
283
  taosArrayClear(pMnode->pSteps);
S
Shengliang Guan 已提交
284
  taosArrayDestroy(pMnode->pSteps);
S
Shengliang Guan 已提交
285
  pMnode->pSteps = NULL;
286
}
S
Shengliang Guan 已提交
287

288
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
289
  int32_t size = taosArrayGetSize(pMnode->pSteps);
290
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
291
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
292
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
293

S
Shengliang Guan 已提交
294
    if ((*pStep->initFp)(pMnode) != 0) {
S
Shengliang Guan 已提交
295
      int32_t code = terrno;
S
Shengliang Guan 已提交
296
      mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
297
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
298
      terrno = code;
S
Shengliang Guan 已提交
299
      return -1;
S
Shengliang Guan 已提交
300
    } else {
S
Shengliang Guan 已提交
301
      mDebug("%s is initialized", pStep->name);
S
Shengliang Guan 已提交
302
      tmsgReportStartup(pStep->name, "initialized");
S
Shengliang Guan 已提交
303 304
    }
  }
S
Shengliang Guan 已提交
305

S
shm  
Shengliang Guan 已提交
306
  pMnode->clusterId = mndGetClusterId(pMnode);
S
Shengliang Guan 已提交
307
  return 0;
308
}
S
Shengliang Guan 已提交
309

S
shm  
Shengliang Guan 已提交
310
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
311
  pMnode->msgCb = pOption->msgCb;
312
  pMnode->selfDnodeId = pOption->dnodeId;
S
Shengliang Guan 已提交
313
  pMnode->syncMgmt.replica = pOption->replica;
S
Shengliang Guan 已提交
314
  pMnode->syncMgmt.standby = pOption->standby;
L
Liu Jicong 已提交
315
}
316

317
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
318 319
  mDebug("start to open mnode in %s", path);

wafwerar's avatar
wafwerar 已提交
320
  SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
321 322 323 324 325 326
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
327 328
  char timestr[24] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
shm  
Shengliang Guan 已提交
329
  mndSetOptions(pMnode, pOption);
S
Shengliang Guan 已提交
330

331
  pMnode->deploy = pOption->deploy;
S
Shengliang Guan 已提交
332 333
  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
wafwerar's avatar
wafwerar 已提交
334
    taosMemoryFree(pMnode);
S
Shengliang Guan 已提交
335 336 337 338
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
339

S
Shengliang Guan 已提交
340
  int32_t code = mndCreateDir(pMnode, path);
341
  if (code != 0) {
S
Shengliang Guan 已提交
342 343
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
344 345 346 347 348
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

349
  code = mndInitSteps(pMnode);
350
  if (code != 0) {
S
Shengliang Guan 已提交
351 352
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
353 354 355 356 357 358 359
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
360 361
    code = terrno;
    mError("failed to open mnode since %s", terrstr());
362 363 364 365
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
366

S
Shengliang Guan 已提交
367
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
368 369
  return pMnode;
}
S
Shengliang Guan 已提交
370

371
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
372 373 374
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
wafwerar's avatar
wafwerar 已提交
375 376
    taosMemoryFreeClear(pMnode->path);
    taosMemoryFreeClear(pMnode);
S
Shengliang Guan 已提交
377 378
    mDebug("mnode is closed");
  }
379
}
S
Shengliang Guan 已提交
380

381
int32_t mndStart(SMnode *pMnode) {
382
  mndSyncStart(pMnode);
S
Shengliang Guan 已提交
383
  if (pMnode->deploy) {
S
Shengliang Guan 已提交
384 385 386 387 388
    if (sdbDeploy(pMnode->pSdb) != 0) {
      mError("failed to deploy sdb while start mnode");
      return -1;
    }
    mndSetRestore(pMnode, true);
389
  }
M
Minghao Li 已提交
390 391 392
  return mndInitTimer(pMnode);
}

393
void mndStop(SMnode *pMnode) {
S
Shengliang Guan 已提交
394
  mndSetStop(pMnode);
395
  mndSyncStop(pMnode);
S
Shengliang Guan 已提交
396
  mndCleanupTimer(pMnode);
M
Minghao Li 已提交
397 398
}

M
Minghao Li 已提交
399
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
400
  SMnode *   pMnode = pMsg->info.node;
401
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
402
  int32_t    code = 0;
M
Minghao Li 已提交
403

404 405
  if (!syncEnvIsStart()) {
    mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
406 407
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
408
  }
M
Minghao Li 已提交
409

410 411 412
  SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
  if (pSyncNode == NULL) {
    mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
413 414
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
    return -1;
S
Shengliang Guan 已提交
415
  }
416

C
Cary Xu 已提交
417
  char  logBuf[512] = {0};
418
  char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
S
Shengliang Guan 已提交
419
  snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
M
Minghao Li 已提交
420 421 422 423
  static int64_t mndTick = 0;
  if (++mndTick % 10 == 1) {
    mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
  }
424 425 426
  syncRpcMsgLog2(logBuf, pMsg);
  taosMemoryFree(syncNodeStr);

427
  // ToDo: ugly! use function pointer
M
Minghao Li 已提交
428
  if (syncNodeSnapshotEnable(pSyncNode)) {
429
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
430 431 432
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
433
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
434 435 436
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
437
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
438 439 440
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
441
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
442 443 444
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);
445
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
446 447 448
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
449
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
450 451 452
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
453
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
454 455 456
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
457
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
458 459 460
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
461
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
462 463 464
      SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
      syncSnapshotSendDestroy(pSyncMsg);
465
    } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
466 467 468
      SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
      code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
      syncSnapshotRspDestroy(pSyncMsg);
469
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
470 471 472
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
473 474
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
475
      code = -1;
M
Minghao Li 已提交
476
    }
M
Minghao Li 已提交
477
  } else {
478
    if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
M
Minghao Li 已提交
479 480 481
      SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
      code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
      syncTimeoutDestroy(pSyncMsg);
482
    } else if (pMsg->msgType == TDMT_SYNC_PING) {
M
Minghao Li 已提交
483 484 485
      SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
      code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
      syncPingDestroy(pSyncMsg);
486
    } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
M
Minghao Li 已提交
487 488 489
      SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
      code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
      syncPingReplyDestroy(pSyncMsg);
490
    } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
491 492 493
      SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
      code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
      syncClientRequestDestroy(pSyncMsg);
494
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
495 496 497
      SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
      syncRequestVoteDestroy(pSyncMsg);
498
    } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
499 500 501
      SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
      code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
      syncRequestVoteReplyDestroy(pSyncMsg);
502
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
503 504 505
      SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
      syncAppendEntriesDestroy(pSyncMsg);
506
    } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
507 508 509
      SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
      code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
      syncAppendEntriesReplyDestroy(pSyncMsg);
510
    } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
511 512 513
      code = syncSetStandby(pMgmt->sync);
      SRpcMsg rsp = {.code = code, .info = pMsg->info};
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
514 515
    } else {
      mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
M
Minghao Li 已提交
516
      code = -1;
M
Minghao Li 已提交
517
    }
M
Minghao Li 已提交
518 519
  }

M
Minghao Li 已提交
520 521 522
  if (code != 0) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  }
523
  return code;
M
Minghao Li 已提交
524 525
}

S
Shengliang Guan 已提交
526
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
527
  if (!IsReq(pMsg)) return 0;
S
Shengliang Guan 已提交
528
  if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
S
Shengliang Guan 已提交
529

S
Shengliang Guan 已提交
530 531
  if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
      pMsg->msgType != TDMT_MND_TRANS_TIMER) {
S
Shengliang Guan 已提交
532
    mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
533

D
dapan1121 已提交
534 535
    mndAbortPreprocessMsg(pMsg);

S
Shengliang Guan 已提交
536 537
    SEpSet epSet = {0};
    mndGetMnodeEpSet(pMsg->info.node, &epSet);
538

S
Shengliang Guan 已提交
539 540 541 542 543 544 545 546
    int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
    pMsg->info.rsp = rpcMallocCont(contLen);
    if (pMsg->info.rsp != NULL) {
      tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
      pMsg->info.rspLen = contLen;
      terrno = TSDB_CODE_RPC_REDIRECT;
    } else {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
547
    }
S
Shengliang Guan 已提交
548
  }
549 550 551 552

  return -1;
}

S
Shengliang Guan 已提交
553
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
554 555 556
  if (!IsReq(pMsg)) return 0;
  if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;

S
Shengliang Guan 已提交
557 558
  mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
         pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
559 560 561 562
  terrno = TSDB_CODE_INVALID_MSG_LEN;
  return -1;
}

S
Shengliang Guan 已提交
563
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
564
  SMnode * pMnode = pMsg->info.node;
S
Shengliang Guan 已提交
565
  MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
S
Shengliang Guan 已提交
566
  if (fp == NULL) {
567
    mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
568 569
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    return -1;
S
Shengliang Guan 已提交
570 571
  }

S
Shengliang Guan 已提交
572 573 574
  if (mndCheckMsgContent(pMsg) != 0) return -1;
  if (mndCheckMnodeState(pMsg) != 0) return -1;

dengyihao's avatar
dengyihao 已提交
575 576
  STraceId *trace = &pMsg->info.traceId;
  mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
577
  int32_t code = (*fp)(pMsg);
S
Shengliang Guan 已提交
578 579
  mndReleaseRpcRef(pMnode);

S
Shengliang Guan 已提交
580
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
581 582
    mTrace("msg:%p, won't response immediately since in progress", pMsg);
  } else if (code == 0) {
S
Shengliang Guan 已提交
583
    mTrace("msg:%p, successfully processed", pMsg);
584
  } else {
585
    mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
586
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
587
  }
S
Shengliang Guan 已提交
588

S
shm  
Shengliang Guan 已提交
589
  return code;
S
Shengliang Guan 已提交
590 591
}

592 593 594 595
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
  tmsg_t type = TMSG_INDEX(msgType);
  if (type >= 0 && type < TDMT_MAX) {
    pMnode->msgFp[type] = fp;
S
Shengliang Guan 已提交
596 597 598
  }
}

D
dapan1121 已提交
599
// Note: uid 0 is reserved
S
sma  
Shengliang Guan 已提交
600
int64_t mndGenerateUid(char *name, int32_t len) {
S
monitor  
Shengliang Guan 已提交
601
  int32_t hashval = MurmurHash3_32(name, len);
D
dapan1121 已提交
602 603

  do {
L
Liu Jicong 已提交
604
    int64_t us = taosGetTimestampUs();
S
sma  
Shengliang Guan 已提交
605 606
    int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
    int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
D
dapan1121 已提交
607
    if (uuid) {
L
Liu Jicong 已提交
608
      return llabs(uuid);
D
dapan1121 已提交
609 610
    }
  } while (true);
L
Liu Jicong 已提交
611
}
S
Shengliang Guan 已提交
612 613 614

int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
                          SMonGrantInfo *pGrantInfo) {
S
Shengliang Guan 已提交
615
  if (mndAcquireRpcRef(pMnode) != 0) return -1;
S
Shengliang Guan 已提交
616

dengyihao's avatar
dengyihao 已提交
617
  SSdb *  pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
618 619 620 621 622 623
  int64_t ms = taosGetTimestampMs();

  pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
  pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
  pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
  if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
S
Shengliang Guan 已提交
624
    mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
625 626 627 628
    return -1;
  }

  // cluster info
wmmhello's avatar
wmmhello 已提交
629
  tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
S
Shengliang Guan 已提交
630 631 632 633 634 635 636 637 638 639 640 641
  pClusterInfo->monitor_interval = tsMonitorInterval;
  pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);

  void *pIter = NULL;
  while (1) {
    SDnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonDnodeDesc desc = {0};
    desc.dnode_id = pObj->id;
    tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
S
Shengliang Guan 已提交
642
    if (mndIsDnodeOnline(pObj, ms)) {
S
Shengliang Guan 已提交
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
      tstrncpy(desc.status, "ready", sizeof(desc.status));
    } else {
      tstrncpy(desc.status, "offline", sizeof(desc.status));
    }
    taosArrayPush(pClusterInfo->dnodes, &desc);
    sdbRelease(pSdb, pObj);
  }

  pIter = NULL;
  while (1) {
    SMnodeObj *pObj = NULL;
    pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
    if (pIter == NULL) break;

    SMonMnodeDesc desc = {0};
    desc.mnode_id = pObj->id;
    tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));

661
    if (pObj->id == pMnode->selfDnodeId) {
S
Shengliang Guan 已提交
662 663
      pClusterInfo->first_ep_dnode_id = pObj->id;
      tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
664 665 666 667
      pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
      tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
    } else {
      tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
S
Shengliang Guan 已提交
668
    }
669 670
    taosArrayPush(pClusterInfo->mnodes, &desc);
    sdbRelease(pSdb, pObj);
S
Shengliang Guan 已提交
671 672 673 674 675 676 677 678 679 680 681 682 683
  }

  // vgroup info
  pIter = NULL;
  while (1) {
    SVgObj *pVgroup = NULL;
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
    if (pIter == NULL) break;

    pClusterInfo->vgroups_total++;

    SMonVgroupDesc desc = {0};
    desc.vgroup_id = pVgroup->vgId;
684 685 686 687 688

    SName name = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
    tNameGetDbName(&name, desc.database_name);

S
Shengliang Guan 已提交
689 690 691 692
    desc.tables_num = pVgroup->numOfTables;
    pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
    tstrncpy(desc.status, "unsynced", sizeof(desc.status));
    for (int32_t i = 0; i < pVgroup->replica; ++i) {
dengyihao's avatar
dengyihao 已提交
693
      SVnodeGid *    pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
694 695
      SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
      pVnDesc->dnode_id = pVgid->dnodeId;
S
Shengliang Guan 已提交
696
      tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
S
Shengliang Guan 已提交
697 698 699 700
      if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
        tstrncpy(desc.status, "ready", sizeof(desc.status));
        pClusterInfo->vgroups_alive++;
      }
S
Shengliang Guan 已提交
701
      if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
S
Shengliang Guan 已提交
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
        pClusterInfo->vnodes_alive++;
      }
      pClusterInfo->vnodes_total++;
    }

    taosArrayPush(pVgroupInfo->vgroups, &desc);
    sdbRelease(pSdb, pVgroup);
  }

  // grant info
  pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
  pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
  if (pMnode->grant.expireTimeMS == 0) {
    pGrantInfo->expire_time = INT32_MAX;
    pGrantInfo->timeseries_total = INT32_MAX;
  }

S
Shengliang Guan 已提交
719
  mndReleaseRpcRef(pMnode);
S
Shengliang Guan 已提交
720
  return 0;
L
Liu Jicong 已提交
721
}
S
Shengliang Guan 已提交
722 723

int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
724
  pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
S
Shengliang Guan 已提交
725
  mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
S
Shengliang Guan 已提交
726
  return 0;
L
fix  
Liu Jicong 已提交
727
}
S
Shengliang Guan 已提交
728 729 730 731 732 733 734 735 736 737 738

int32_t mndAcquireRpcRef(SMnode *pMnode) {
  int32_t code = 0;
  taosThreadRwlockRdlock(&pMnode->lock);
  if (pMnode->stopped) {
    terrno = TSDB_CODE_APP_NOT_READY;
    code = -1;
  } else if (!mndIsMaster(pMnode)) {
    code = -1;
  } else {
    int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
739
    // mTrace("mnode rpc is acquired, ref:%d", ref);
S
Shengliang Guan 已提交
740 741 742 743 744 745 746 747
  }
  taosThreadRwlockUnlock(&pMnode->lock);
  return code;
}

void mndReleaseRpcRef(SMnode *pMnode) {
  taosThreadRwlockRdlock(&pMnode->lock);
  int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
S
Shengliang Guan 已提交
748
  // mTrace("mnode rpc is released, ref:%d", ref);
S
Shengliang Guan 已提交
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
  taosThreadRwlockUnlock(&pMnode->lock);
}

void mndSetRestore(SMnode *pMnode, bool restored) {
  if (restored) {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = true;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
  } else {
    taosThreadRwlockWrlock(&pMnode->lock);
    pMnode->restored = false;
    taosThreadRwlockUnlock(&pMnode->lock);
    mTrace("mnode set restored:%d", restored);
    while (1) {
      if (pMnode->rpcRef <= 0) break;
      taosMsleep(3);
    }
  }
}

bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }

void mndSetStop(SMnode *pMnode) {
  taosThreadRwlockWrlock(&pMnode->lock);
  pMnode->stopped = true;
  taosThreadRwlockUnlock(&pMnode->lock);
  mTrace("mnode set stopped");
}

bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }