dmHandle.c 7.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dmInt.h"

static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
S
Shengliang Guan 已提交
20
  if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
S
Shengliang Guan 已提交
21
    dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
S
Shengliang Guan 已提交
22 23 24 25 26
    taosWLockLatch(&pMgmt->pData->latch);
    pMgmt->pData->dnodeId = pCfg->dnodeId;
    pMgmt->pData->clusterId = pCfg->clusterId;
    dmWriteEps(pMgmt->pData);
    taosWUnLockLatch(&pMgmt->pData->latch);
S
Shengliang Guan 已提交
27 28 29
  }
}

S
Shengliang Guan 已提交
30 31
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
  if (pRsp->code != 0) {
S
Shengliang Guan 已提交
32 33 34 35
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
      dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
      pMgmt->pData->dropped = 1;
      dmWriteEps(pMgmt->pData);
S
Shengliang Guan 已提交
36 37 38 39 40
    }
  } else {
    SStatusRsp statusRsp = {0};
    if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
        tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
S
Shengliang Guan 已提交
41
      pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
S
Shengliang Guan 已提交
42
      dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
S
Shengliang Guan 已提交
43
      dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51 52
    }
    rpcFreeCont(pRsp->pCont);
    tFreeSStatusRsp(&statusRsp);
  }
}

void dmSendStatusReq(SDnodeMgmt *pMgmt) {
  SStatusReq req = {0};

S
Shengliang Guan 已提交
53
  taosRLockLatch(&pMgmt->pData->latch);
S
Shengliang Guan 已提交
54
  req.sver = tsVersion;
S
Shengliang Guan 已提交
55 56 57
  req.dnodeVer = pMgmt->pData->dnodeVer;
  req.dnodeId = pMgmt->pData->dnodeId;
  req.clusterId = pMgmt->pData->clusterId;
S
Shengliang Guan 已提交
58
  if (req.clusterId == 0) req.dnodeId = 0;
S
Shengliang Guan 已提交
59 60
  req.rebootTime = pMgmt->pData->rebootTime;
  req.updateTime = pMgmt->pData->updateTime;
S
Shengliang Guan 已提交
61
  req.numOfCores = tsNumOfCores;
62 63
  req.numOfSupportVnodes = tsNumOfSupportVnodes;
  tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
S
Shengliang Guan 已提交
64 65 66 67 68 69 70 71

  req.clusterCfg.statusInterval = tsStatusInterval;
  req.clusterCfg.checkTime = 0;
  char timestr[32] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
  memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
  memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
  memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
S
Shengliang Guan 已提交
72
  taosRUnLockLatch(&pMgmt->pData->latch);
S
Shengliang Guan 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85

  SMonVloadInfo vinfo = {0};
  dmGetVnodeLoads(pMgmt, &vinfo);
  req.pVloads = vinfo.pVloads;

  SMonMloadInfo minfo = {0};
  dmGetMnodeLoads(pMgmt, &minfo);

  int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
  void   *pHead = rpcMallocCont(contLen);
  tSerializeSStatusReq(pHead, contLen, &req);
  tFreeSStatusReq(&req);

S
Shengliang Guan 已提交
86
  SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
S
Shengliang Guan 已提交
87 88
  SRpcMsg rpcRsp = {0};

S
Shengliang Guan 已提交
89
  dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle);
S
Shengliang Guan 已提交
90 91 92 93

  SEpSet epSet = {0};
  dmGetMnodeEpSet(pMgmt->pData, &epSet);
  rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
S
Shengliang Guan 已提交
94 95 96
  dmProcessStatusRsp(pMgmt, &rpcRsp);
}

S
Shengliang Guan 已提交
97
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
98 99 100 101
  dError("auth rsp is received, but not supported yet");
  return 0;
}

S
Shengliang Guan 已提交
102
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
103 104 105 106
  dError("grant rsp is received, but not supported yet");
  return 0;
}

S
Shengliang Guan 已提交
107
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
108 109 110 111
  dError("config req is received, but not supported yet");
  return TSDB_CODE_OPS_NOT_SUPPORT;
}

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
  pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
  pStatus->details[0] = 0;

  SServerStatusRsp statusRsp = {0};
  SMonMloadInfo    minfo = {0};
  dmGetMnodeLoads(pMgmt, &minfo);
  if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER &&
      minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) {
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
    snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
    return;
  }

  SMonVloadInfo vinfo = {0};
  dmGetVnodeLoads(pMgmt, &vinfo);
  for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
    SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
    if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) {
      pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
      snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
               syncStr(pLoad->syncState));
      break;
    }
  }

  taosArrayDestroy(vinfo.pVloads);
}

S
Shengliang Guan 已提交
141
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
142 143 144 145
  dDebug("server run status req is received");
  SServerStatusRsp statusRsp = {0};
  dmGetServerRunStatus(pMgmt, &statusRsp);

146
  SRpcMsg rspMsg = {.info = pMsg->info};
147 148 149 150 151 152 153 154 155 156 157 158 159
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
  if (rspLen < 0) {
    rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
S
Shengliang Guan 已提交
160 161
  pMsg->info.rsp = pRsp;
  pMsg->info.rspLen = rspLen;
162 163 164
  return 0;
}

S
Shengliang Guan 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
SArray *dmGetMsgHandles() {
  int32_t code = -1;
  SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
  if (pArray == NULL) goto _OVER;

  // Requests handled by DNODE
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
180
  if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

  // Requests handled by MNODE
  if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;

  code = 0;

_OVER:
  if (code != 0) {
    taosArrayDestroy(pArray);
    return NULL;
  } else {
    return pArray;
  }
}