dmMsg.c 5.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18
#include "vm.h"
S
shm  
Shengliang Guan 已提交
19 20 21 22 23 24 25 26

void dmSendStatusReq(SDnodeMgmt *pMgmt) {
  SDnode    *pDnode = pMgmt->pDnode;
  SStatusReq req = {0};

  taosRLockLatch(&pMgmt->latch);
  req.sver = tsVersion;
  req.dver = pMgmt->dver;
S
shm  
Shengliang Guan 已提交
27 28
  req.dnodeId = pDnode->dnodeId;
  req.clusterId = pDnode->clusterId;
S
shm  
Shengliang Guan 已提交
29 30 31
  req.rebootTime = pDnode->rebootTime;
  req.updateTime = pMgmt->updateTime;
  req.numOfCores = tsNumOfCores;
S
shm  
Shengliang Guan 已提交
32
  req.numOfSupportVnodes = pDnode->numOfSupportVnodes;
S
shm  
Shengliang Guan 已提交
33
  tstrncpy(req.dnodeEp, pDnode->localEp, TSDB_EP_LEN);
S
shm  
Shengliang Guan 已提交
34 35 36 37 38

  req.clusterCfg.statusInterval = tsStatusInterval;
  req.clusterCfg.checkTime = 0;
  char timestr[32] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
wafwerar's avatar
wafwerar 已提交
39
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
S
shm  
Shengliang Guan 已提交
40 41 42 43
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
  taosRUnLockLatch(&pMgmt->latch);

S
Shengliang Guan 已提交
44 45 46 47 48 49
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
  if (pWrapper != NULL) {
    req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
    vmMonitorVnodeLoads(pWrapper, req.pVloads);
    dndReleaseWrapper(pWrapper);
  }
S
shm  
Shengliang Guan 已提交
50 51 52 53 54 55 56 57 58

  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
  void   *pHead = rpcMallocCont(contLen);
  tSerializeSStatusReq(pHead, contLen, &req);
  taosArrayDestroy(req.pVloads);

  SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
  pMgmt->statusSent = 1;

S
shm  
Shengliang Guan 已提交
59
  dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
S
shm  
Shengliang Guan 已提交
60
  dndSendReqToMnode(pMgmt->pWrapper, &rpcMsg);
S
shm  
Shengliang Guan 已提交
61 62
}

S
shm  
Shengliang Guan 已提交
63
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
S
shm  
Shengliang Guan 已提交
64 65 66
  SDnode *pDnode = pMgmt->pDnode;

  if (pDnode->dnodeId == 0) {
S
shm  
Shengliang Guan 已提交
67 68
    dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
    taosWLockLatch(&pMgmt->latch);
S
shm  
Shengliang Guan 已提交
69 70
    pDnode->dnodeId = pCfg->dnodeId;
    pDnode->clusterId = pCfg->clusterId;
S
shm  
Shengliang Guan 已提交
71 72 73 74 75
    dmWriteFile(pMgmt);
    taosWUnLockLatch(&pMgmt->latch);
  }
}

S
shm  
Shengliang Guan 已提交
76 77 78
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  SDnode  *pDnode = pMgmt->pDnode;
  SRpcMsg *pRsp = &pMsg->rpcMsg;
S
shm  
Shengliang Guan 已提交
79 80

  if (pRsp->code != TSDB_CODE_SUCCESS) {
S
shm  
Shengliang Guan 已提交
81 82 83
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->dropped && pDnode->dnodeId > 0) {
      dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->dnodeId);
      pDnode->dropped = 1;
S
shm  
Shengliang Guan 已提交
84 85 86 87 88 89 90
      dmWriteFile(pMgmt);
    }
  } else {
    SStatusRsp statusRsp = {0};
    if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
      pMgmt->dver = statusRsp.dver;
S
shm  
Shengliang Guan 已提交
91
      dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
S
shm  
Shengliang Guan 已提交
92 93
      dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps);
    }
S
shm  
Shengliang Guan 已提交
94
    tFreeSStatusRsp(&statusRsp);
S
shm  
Shengliang Guan 已提交
95 96 97 98 99
  }

  pMgmt->statusSent = 0;
}

S
shm  
Shengliang Guan 已提交
100 101 102 103 104
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  SRpcMsg *pRsp = &pMsg->rpcMsg;
  dError("auth rsp is received, but not supported yet");
  return 0;
}
S
shm  
Shengliang Guan 已提交
105

S
shm  
Shengliang Guan 已提交
106 107 108 109 110
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  SRpcMsg *pRsp = &pMsg->rpcMsg;
  dError("grant rsp is received, but not supported yet");
  return 0;
}
S
shm  
Shengliang Guan 已提交
111

S
shm  
Shengliang Guan 已提交
112 113
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  SRpcMsg       *pReq = &pMsg->rpcMsg;
S
shm  
Shengliang Guan 已提交
114
  SDCfgDnodeReq *pCfg = pReq->pCont;
S
shm  
Shengliang Guan 已提交
115
  dError("config req is received, but not supported yet");
S
shm  
Shengliang Guan 已提交
116 117 118
  return TSDB_CODE_OPS_NOT_SUPPORT;
}

S
shm  
Shengliang Guan 已提交
119
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
120
  // Requests handled by DNODE
S
Shengliang Guan 已提交
121 122 123 124 125 126 127 128 129 130
  dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, VND_VGID);
S
shm  
Shengliang Guan 已提交
131 132

  // Requests handled by MNODE
S
Shengliang Guan 已提交
133 134 135
  dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, VND_VGID);
  dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, VND_VGID);
S
shm  
Shengliang Guan 已提交
136
}