mnode.c 12.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
#include "mndAcct.h"
#include "mndAuth.h"
#include "mndBalance.h"
#include "mndCluster.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndMnode.h"
#include "mndOper.h"
#include "mndProfile.h"
#include "mndShow.h"
#include "mndStable.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
S
Shengliang Guan 已提交
34

35 36 37 38
int32_t mndGetDnodeId(SMnode *pMnode) {
  if (pMnode != NULL) {
    return pMnode->dnodeId;
  }
S
Shengliang Guan 已提交
39

40 41
  return -1;
}
S
Shengliang Guan 已提交
42

43 44 45 46
int64_t mndGetClusterId(SMnode *pMnode) {
  if (pMnode != NULL) {
    return pMnode->clusterId;
  }
S
Shengliang Guan 已提交
47

48 49
  return -1;
}
S
Shengliang Guan 已提交
50

51 52 53 54
tmr_h mndGetTimer(SMnode *pMnode) {
  if (pMnode != NULL) {
    return pMnode->timer;
  }
S
Shengliang Guan 已提交
55 56

  return NULL;
57 58
}

59 60 61 62
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
    (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
  }
S
Shengliang Guan 已提交
63
}
S
Shengliang Guan 已提交
64

65 66 67 68
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendMsgToMnodeFp != NULL) {
    (*pMnode->sendMsgToMnodeFp)(pMnode->pDnode, pMsg);
  }
S
Shengliang Guan 已提交
69
}
S
Shengliang Guan 已提交
70

71 72 73 74
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) {
  if (pMnode != NULL && pMnode->sendRedirectMsgFp != NULL) {
    (*pMnode->sendRedirectMsgFp)(pMnode->pDnode, pMsg);
  }
S
Shengliang Guan 已提交
75
}
S
Shengliang Guan 已提交
76

77 78 79
static int32_t mndInitTimer(SMnode *pMnode) {
  if (pMnode->timer == NULL) {
    pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
S
Shengliang Guan 已提交
80 81
  }

82
  if (pMnode->timer == NULL) {
S
Shengliang Guan 已提交
83
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
84 85 86 87 88 89
    return -1;
  }

  return 0;
}

90 91 92 93
static void mndCleanupTimer(SMnode *pMnode) {
  if (pMnode->timer != NULL) {
    taosTmrCleanUp(pMnode->timer);
    pMnode->timer = NULL;
S
Shengliang Guan 已提交
94 95 96
  }
}

97 98 99 100
static int32_t mnodeCreateDir(SMnode *pMnode, const char *path) {
  pMnode->path = strdup(path);
  if (pMnode->path == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
101
    return -1;
102 103 104 105
  }

  if (taosMkDir(pMnode->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
106
    return -1;
107
  }
108 109

  return 0;
110
}
S
Shengliang Guan 已提交
111

112 113 114
static int32_t mndInitSdb(SMnode *pMnode) {
  SSdbOpt opt = {0};
  opt.path = pMnode->path;
S
Shengliang Guan 已提交
115
  opt.pMnode = pMnode;
S
Shengliang Guan 已提交
116

S
Shengliang Guan 已提交
117
  pMnode->pSdb = sdbInit(&opt);
118
  if (pMnode->pSdb == NULL) {
S
Shengliang Guan 已提交
119 120 121 122 123 124
    return -1;
  }

  return 0;
}

125
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
S
Shengliang Guan 已提交
126
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
127 128 129

static void mndCleanupSdb(SMnode *pMnode) {
  if (pMnode->pSdb) {
S
Shengliang Guan 已提交
130
    sdbCleanup(pMnode->pSdb);
131 132 133 134
    pMnode->pSdb = NULL;
  }
}

135 136 137 138 139
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
  SMnodeStep step = {0};
  step.name = name;
  step.initFp = initFp;
  step.cleanupFp = cleanupFp;
S
Shengliang Guan 已提交
140
  if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
141 142 143 144
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
145 146 147
  return 0;
}

148
static int32_t mndInitSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156 157 158 159 160
  if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
161
  if (pMnode->clusterId <= 0) {
S
Shengliang Guan 已提交
162 163 164 165 166 167 168 169 170 171 172
    if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
  } else {
    if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
  }
  if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
  if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
S
Shengliang Guan 已提交
173 174 175 176

  return 0;
}

177
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
S
Shengliang Guan 已提交
178 179
  if (pMnode->pSteps == NULL) return;

180
  if (pos == -1) {
S
Shengliang Guan 已提交
181
    pos = taosArrayGetSize(pMnode->pSteps);
S
Shengliang Guan 已提交
182 183
  }

184
  for (int32_t s = pos; s >= 0; s--) {
S
Shengliang Guan 已提交
185
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
186 187 188 189
    mDebug("step:%s will cleanup", pStep->name);
    if (pStep->cleanupFp != NULL) {
      (*pStep->cleanupFp)(pMnode);
    }
S
Shengliang Guan 已提交
190 191
  }

S
Shengliang Guan 已提交
192 193
  taosArrayClear(pMnode->pSteps);
  pMnode->pSteps = NULL;
194
}
S
Shengliang Guan 已提交
195

196
static int32_t mndExecSteps(SMnode *pMnode) {
S
Shengliang Guan 已提交
197
  int32_t size = taosArrayGetSize(pMnode->pSteps);
198
  for (int32_t pos = 0; pos < size; pos++) {
S
Shengliang Guan 已提交
199
    SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
200
    if (pStep->initFp == NULL) continue;
S
Shengliang Guan 已提交
201

202 203
    // (*pMnode->reportProgress)(pStep->name, "start initialize");

S
Shengliang Guan 已提交
204 205
    if ((*pStep->initFp)(pMnode) != 0) {
      mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
206
      mndCleanupSteps(pMnode, pos);
S
Shengliang Guan 已提交
207
      return -1;
S
Shengliang Guan 已提交
208
    } else {
209
      mDebug("step:%s is initialized", pStep->name);
S
Shengliang Guan 已提交
210
    }
211 212

    // (*pMnode->reportProgress)(pStep->name, "initialize completed");
S
Shengliang Guan 已提交
213
  }
S
Shengliang Guan 已提交
214 215

  return 0;
216
}
S
Shengliang Guan 已提交
217

218 219 220 221 222 223 224 225 226 227 228
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
  pMnode->dnodeId = pOption->dnodeId;
  pMnode->clusterId = pOption->clusterId;
  pMnode->replica = pOption->replica;
  pMnode->selfIndex = pOption->selfIndex;
  memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
  pMnode->pDnode = pOption->pDnode;
  pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
  pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
  pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
  pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
S
Shengliang Guan 已提交
229 230 231 232 233 234
  pMnode->sver = pOption->sver;
  pMnode->statusInterval = pOption->statusInterval;
  pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum;
  pMnode->timezone = strdup(pOption->timezone);
  pMnode->locale = strdup(pOption->locale);
  pMnode->charset = strdup(pOption->charset);
235 236

  if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
S
Shengliang Guan 已提交
237 238
      pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
      pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) {
239
    terrno = TSDB_CODE_MND_APP_ERROR;
S
Shengliang Guan 已提交
240 241 242
    return -1;
  }

S
Shengliang Guan 已提交
243 244
  if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
245
    return -1;
246 247 248 249 250
  }

  return 0;
}

251
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
252 253
  mDebug("start to open mnode in %s", path);

254
  SMnode *pMnode = calloc(1, sizeof(SMnode));
S
Shengliang Guan 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267
  if (pMnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }

  pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
  if (pMnode->pSteps == NULL) {
    free(pMnode);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to open mnode since %s", terrstr());
    return NULL;
  }
268

269
  int32_t code = mnodeCreateDir(pMnode, path);
S
Shengliang Guan 已提交
270 271
  if (mnodeCreateDir(pMnode, path) != 0) {
    mError("failed to open mnode since %s", tstrerror(code));
272 273 274 275 276 277
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndSetOptions(pMnode, pOption);
278
  if (code != 0) {
S
Shengliang Guan 已提交
279
    mError("failed to open mnode since %s", tstrerror(code));
280 281 282 283 284 285 286
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndInitSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
287
    mError("failed to open mnode since %s", tstrerror(code));
288 289 290 291 292 293 294
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }

  code = mndExecSteps(pMnode);
  if (code != 0) {
S
Shengliang Guan 已提交
295
    mError("failed to open mnode since %s", tstrerror(code));
296 297 298 299
    mndClose(pMnode);
    terrno = code;
    return NULL;
  }
S
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301
  mDebug("mnode open successfully ");
S
Shengliang Guan 已提交
302 303
  return pMnode;
}
S
Shengliang Guan 已提交
304

305
void mndClose(SMnode *pMnode) {
S
Shengliang Guan 已提交
306 307 308 309
  if (pMnode != NULL) {
    mDebug("start to close mnode");
    mndCleanupSteps(pMnode, -1);
    tfree(pMnode->path);
S
Shengliang Guan 已提交
310 311 312
    tfree(pMnode->charset);
    tfree(pMnode->locale);
    tfree(pMnode->timezone);
S
Shengliang Guan 已提交
313 314 315
    tfree(pMnode);
    mDebug("mnode is closed");
  }
316
}
S
Shengliang Guan 已提交
317

318
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
S
Shengliang Guan 已提交
319 320
  mDebug("start to alter mnode");
  mDebug("mnode is altered");
321 322
  return 0;
}
S
Shengliang Guan 已提交
323

324
void mndDestroy(const char *path) {
S
Shengliang Guan 已提交
325
  mDebug("start to destroy mnode at %s", path);
S
Shengliang Guan 已提交
326
  taosRemoveDir(path);
S
Shengliang Guan 已提交
327
  mDebug("mnode is destroyed");
328
}
S
Shengliang Guan 已提交
329

330
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
S
Shengliang Guan 已提交
331 332 333 334 335 336 337 338 339 340 341
  pLoad->numOfDnode = 0;
  pLoad->numOfMnode = 0;
  pLoad->numOfVgroup = 0;
  pLoad->numOfDatabase = 0;
  pLoad->numOfSuperTable = 0;
  pLoad->numOfChildTable = 0;
  pLoad->numOfColumn = 0;
  pLoad->totalPoints = 0;
  pLoad->totalStorage = 0;
  pLoad->compStorage = 0;

342 343
  return 0;
}
S
Shengliang Guan 已提交
344

345
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
346 347 348 349 350 351 352
  SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
353
    mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
354 355 356 357 358 359 360 361 362 363 364
    mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    return NULL;
  }

  pMsg->rpcMsg = *pRpcMsg;
  pMsg->createdTime = taosGetTimestampSec();

  return pMsg;
}

365
void mndCleanupMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
366
  if (pMsg->pUser != NULL) {
S
Shengliang Guan 已提交
367
    sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser);
S
Shengliang Guan 已提交
368 369 370 371 372
  }

  taosFreeQitem(pMsg);
}

373
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
S
Shengliang Guan 已提交
374

375 376
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
377 378 379 380
  int32_t code = 0;
  int32_t msgType = pMsg->rpcMsg.msgType;
  void   *ahandle = pMsg->rpcMsg.ahandle;
  bool    isReq = (msgType % 2 == 1);
381

S
Shengliang Guan 已提交
382 383 384 385 386 387 388 389 390
  if (isReq && !mndIsMaster(pMnode)) {
    code = TSDB_CODE_APP_NOT_READY;
    goto PROCESS_RPC_END;
  }

  if (isReq && pMsg->rpcMsg.pCont == NULL) {
    mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
391 392
  }

393
  MndMsgFp fp = pMnode->msgFp[msgType];
S
Shengliang Guan 已提交
394
  if (fp == NULL) {
S
Shengliang Guan 已提交
395 396 397
    mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MSG_NOT_PROCESSED;
    goto PROCESS_RPC_END;
S
Shengliang Guan 已提交
398 399
  }

S
Shengliang Guan 已提交
400
  code = (*fp)(pMnode, pMsg);
S
Shengliang Guan 已提交
401
  if (code != 0) {
S
Shengliang Guan 已提交
402 403
    code = terrno;
    mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr());
S
Shengliang Guan 已提交
404 405 406 407 408 409 410 411 412 413 414 415
    goto PROCESS_RPC_END;
  }

PROCESS_RPC_END:
  if (isReq) {
    if (code == TSDB_CODE_APP_NOT_READY) {
      mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
    } else if (code != 0) {
      SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code};
      rpcSendResponse(&rspMsg);
    } else {
    }
S
Shengliang Guan 已提交
416
  }
417 418

  mndCleanupMsg(pMsg);
S
Shengliang Guan 已提交
419 420
}

421
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
S
Shengliang Guan 已提交
422
  if (msgType >= 0 && msgType < TSDB_MSG_TYPE_MAX) {
423
    pMnode->msgFp[msgType] = fp;
S
Shengliang Guan 已提交
424 425 426
  }
}

427
void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
428

429
void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
430

431
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
432

433
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}