mnode.c 11.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tqueue.h"
#include "mnodeAcct.h"
#include "mnodeAuth.h"
#include "mnodeBalance.h"
#include "mnodeCluster.h"
#include "mnodeDb.h"
#include "mnodeDnode.h"
#include "mnodeFunc.h"
#include "mnodeMnode.h"
#include "mnodeOper.h"
#include "mnodeProfile.h"
#include "mnodeShow.h"
#include "mnodeStable.h"
#include "mnodeSync.h"
#include "mnodeTelem.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
S
Shengliang Guan 已提交
36
#include "mnodeTrans.h"
S
Shengliang Guan 已提交
37

S
Shengliang Guan 已提交
38
SMnodeBak tsMint = {0};
S
Shengliang Guan 已提交
39 40 41 42 43

int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }

int64_t mnodeGetClusterId() { return tsMint.para.clusterId; }

S
Shengliang Guan 已提交
44 45 46 47
void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
  assert(pMnode);
  (*pMnode->sendMsgToDnodeFp)(pMnode->pServer, epSet, rpcMsg);
}
S
Shengliang Guan 已提交
48

S
Shengliang Guan 已提交
49 50 51 52
void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) {
  assert(pMnode);
  (*pMnode->sendMsgToMnodeFp)(pMnode->pServer, rpcMsg);
}
S
Shengliang Guan 已提交
53

S
Shengliang Guan 已提交
54 55 56 57
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) {
  assert(pMnode);
  (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell);
}
S
Shengliang Guan 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

static int32_t mnodeInitTimer() {
  if (tsMint.timer == NULL) {
    tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND");
  }

  if (tsMint.timer == NULL) {
    return -1;
  }

  return 0;
}

static void mnodeCleanupTimer() {
  if (tsMint.timer != NULL) {
    taosTmrCleanUp(tsMint.timer);
    tsMint.timer = NULL;
  }
}

tmr_h mnodeGetTimer() { return tsMint.timer; }

S
Shengliang Guan 已提交
80 81 82 83 84 85
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) {
  pMnode->dnodeId = pOptions->dnodeId;
  pMnode->clusterId = pOptions->clusterId;
  pMnode->replica = pOptions->replica;
  pMnode->selfIndex = pOptions->selfIndex;
  memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
S
Shengliang Guan 已提交
86
  pMnode->pServer = pOptions->pDnode;
S
Shengliang Guan 已提交
87 88 89 90 91 92 93
  pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp;
  pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp;
  pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp;
  pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp;

  if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
      pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101 102 103 104
    terrno = TSDB_CODE_MND_APP_ERROR;
    return -1;
  }

  return 0;
}

static int32_t mnodeAllocInitSteps() {
  struct SSteps *steps = taosStepInit(16, NULL);
  if (steps == NULL) return -1;

S
Shengliang Guan 已提交
105
  if (taosStepAdd(steps, "mnode-trans", mnodeInitTrans, mnodeCleanupTrans) != 0) return -1;
S
Shengliang Guan 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1;
  if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1;
  if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1;
  if (taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct) != 0) return -1;
  if (taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth) != 0) return -1;
  if (taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser) != 0) return -1;
  if (taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb) != 0) return -1;
  if (taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup) != 0) return -1;
  if (taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable) != 0) return -1;
  if (taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc) != 0) return -1;
  if (taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1;

  tsMint.pInitSteps = steps;
  return 0;
}

static int32_t mnodeAllocStartSteps() {
S
Shengliang Guan 已提交
123
  struct SSteps *steps = taosStepInit(8, NULL);
S
Shengliang Guan 已提交
124 125 126
  if (steps == NULL) return -1;

  taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL);
127
  taosStepAdd(steps, "mnode-sdb-file", sdbOpen, sdbClose);
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136 137 138
  taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance);
  taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile);
  taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
  taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync);
  taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem);
  taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer);

  tsMint.pStartSteps = steps;
  return 0;
}

S
Shengliang Guan 已提交
139
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
S
Shengliang Guan 已提交
140 141
  SMnode *pMnode = calloc(1, sizeof(SMnode));

S
Shengliang Guan 已提交
142
  if (mnodeSetOptions(pMnode, pOptions) != 0) {
S
Shengliang Guan 已提交
143
    free(pMnode);
S
Shengliang Guan 已提交
144
    mError("failed to init mnode options since %s", terrstr());
S
Shengliang Guan 已提交
145
    return NULL;
S
Shengliang Guan 已提交
146 147 148 149
  }

  if (mnodeAllocInitSteps() != 0) {
    mError("failed to alloc init steps since %s", terrstr());
S
Shengliang Guan 已提交
150
    return NULL;
S
Shengliang Guan 已提交
151 152 153 154
  }

  if (mnodeAllocStartSteps() != 0) {
    mError("failed to alloc start steps since %s", terrstr());
S
Shengliang Guan 已提交
155
    return NULL;
S
Shengliang Guan 已提交
156 157
  }

S
Shengliang Guan 已提交
158
  taosStepExec(tsMint.pInitSteps);
S
Shengliang Guan 已提交
159 160 161 162

  if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) {
    if (sdbDeploy() != 0) {
      mError("failed to deploy sdb since %s", terrstr());
S
Shengliang Guan 已提交
163 164 165
      return NULL;
    } else {
      mInfo("mnode is deployed");
S
Shengliang Guan 已提交
166 167 168
    }
  }

S
Shengliang Guan 已提交
169
  taosStepExec(tsMint.pStartSteps);
S
Shengliang Guan 已提交
170

S
Shengliang Guan 已提交
171 172
  return pMnode;
}
S
Shengliang Guan 已提交
173

S
Shengliang Guan 已提交
174
void mnodeClose(SMnode *pMnode) { free(pMnode); }
S
Shengliang Guan 已提交
175

S
Shengliang Guan 已提交
176
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; }
S
Shengliang Guan 已提交
177

S
Shengliang Guan 已提交
178
void mnodeDestroy(const char *path) { sdbUnDeploy(); }
S
Shengliang Guan 已提交
179

S
Shengliang Guan 已提交
180
int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; }
S
Shengliang Guan 已提交
181

S
Shengliang Guan 已提交
182
SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
183 184 185 186 187 188 189
  SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
  if (pMsg == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
S
Shengliang Guan 已提交
190
    mnodeCleanupMsg(pMsg);
S
Shengliang Guan 已提交
191 192 193 194 195 196 197 198 199 200 201
    mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    return NULL;
  }

  pMsg->rpcMsg = *pRpcMsg;
  pMsg->createdTime = taosGetTimestampSec();

  return pMsg;
}

S
Shengliang Guan 已提交
202
void mnodeCleanupMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
203 204 205 206 207 208 209 210
  if (pMsg->pUser != NULL) {
    sdbRelease(pMsg->pUser);
  }

  taosFreeQitem(pMsg);
}

static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
211 212
  if (!mnodeIsMaster()) {
    mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true);
S
Shengliang Guan 已提交
213
    mnodeCleanupMsg(pMsg);
S
Shengliang Guan 已提交
214 215 216
    return;
  }

S
Shengliang Guan 已提交
217 218
  int32_t msgType = pMsg->rpcMsg.msgType;

S
Shengliang Guan 已提交
219 220
  MnodeRpcFp fp = tsMint.msgFp[msgType];
  if (fp == NULL) {
S
Shengliang Guan 已提交
221 222
  }

S
Shengliang Guan 已提交
223 224 225 226
  int32_t code = (fp)(pMsg);
  if (code != 0) {
    assert(code);
  }
S
Shengliang Guan 已提交
227 228 229 230 231 232 233 234
}

void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
  if (msgType > 0 || msgType < TSDB_MSG_TYPE_MAX) {
    tsMint.msgFp[msgType] = fp;
  }
}

S
Shengliang Guan 已提交
235
void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
S
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237 238 239 240 241
void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }

void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }

void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg) {}
S
Shengliang Guan 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377

#if 0

static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) {
  int32_t msgType = pMsg->rpcMsg.msgType;
  void   *ahandle = pMsg->rpcMsg.ahandle;
  int32_t code = 0;

  if (pMsg->rpcMsg.pCont == NULL) {
    mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
    goto PROCESS_WRITE_REQ_END;
  }

  if (!mnodeIsMaster()) {
    SMnRsp *rpcRsp = &pMsg->rpcRsp;
    SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
    mnodeGetMnodeEpSetForShell(epSet, true);
    rpcRsp->rsp = epSet;
    rpcRsp->len = sizeof(SEpSet);

    mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
           taosMsg[msgType], epSet->numOfEps, epSet->inUse);

    code = TSDB_CODE_RPC_REDIRECT;
    goto PROCESS_WRITE_REQ_END;
  }

  if (tsMworker.writeMsgFp[msgType] == NULL) {
    mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
    goto PROCESS_WRITE_REQ_END;
  }

  code = (*tsMworker.writeMsgFp[msgType])(pMsg);

PROCESS_WRITE_REQ_END:
  mnodeSendRsp(pMsg, code);
}

static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
  int32_t msgType = pMsg->rpcMsg.msgType;
  void   *ahandle = pMsg->rpcMsg.ahandle;
  int32_t code = 0;

  if (pMsg->rpcMsg.pCont == NULL) {
    mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
    goto PROCESS_READ_REQ_END;
  }

  if (!mnodeIsMaster()) {
    SMnRsp *rpcRsp = &pMsg->rpcRsp;
    SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
    if (!epSet) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto PROCESS_READ_REQ_END;
    }
    mnodeGetMnodeEpSetForShell(epSet, true);
    rpcRsp->rsp = epSet;
    rpcRsp->len = sizeof(SEpSet);

    mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
           epSet->numOfEps, epSet->inUse);
    code = TSDB_CODE_RPC_REDIRECT;
    goto PROCESS_READ_REQ_END;
  }

  if (tsMworker.readMsgFp[msgType] == NULL) {
    mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
    goto PROCESS_READ_REQ_END;
  }

  mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
  code = (*tsMworker.readMsgFp[msgType])(pMsg);

PROCESS_READ_REQ_END:
  mnodeSendRsp(pMsg, code);
}

static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
  int32_t msgType = pMsg->rpcMsg.msgType;
  void   *ahandle = pMsg->rpcMsg.ahandle;
  int32_t code = 0;

  if (pMsg->rpcMsg.pCont == NULL) {
    mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_INVALID_MSG_LEN;
    goto PROCESS_PEER_REQ_END;
  }

  if (!mnodeIsMaster()) {
    SMnRsp *rpcRsp = &pMsg->rpcRsp;
    SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
    mnodeGetMnodeEpSetForPeer(epSet, true);
    rpcRsp->rsp = epSet;
    rpcRsp->len = sizeof(SEpSet);

    mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
           taosMsg[msgType], epSet->numOfEps, epSet->inUse);

    code = TSDB_CODE_RPC_REDIRECT;
    goto PROCESS_PEER_REQ_END;
  }

  if (tsMworker.peerReqFp[msgType] == NULL) {
    mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
    code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
    goto PROCESS_PEER_REQ_END;
  }

  code = (*tsMworker.peerReqFp[msgType])(pMsg);

PROCESS_PEER_REQ_END:
  mnodeSendRsp(pMsg, code);
}

static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
  int32_t  msgType = pMsg->rpcMsg.msgType;
  SRpcMsg *pRpcMsg = &pMsg->rpcMsg;

  if (!mnodeIsMaster()) {
    mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
    mnodeCleanupMsg2(pMsg);
  }

  if (tsMworker.peerRspFp[msgType]) {
    (*tsMworker.peerRspFp[msgType])(pRpcMsg);
  } else {
    mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
  }

  mnodeCleanupMsg2(pMsg);
}
#endif