mmHandle.c 11.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

19 20 21 22 23
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) {
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
  mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
}

24 25 26
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
  SMonMmInfo mmInfo = {0};
  mmGetMonitorInfo(pWrapper, &mmInfo);
S
Shengliang Guan 已提交
27
  dmGetMonitorSysInfo(&mmInfo.sys);
28
  monGetLogs(&mmInfo.log);
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

  int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
  if (rspLen < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
  pReq->pRsp = pRsp;
  pReq->rspLen = rspLen;
  tFreeSMonMmInfo(&mmInfo);
  return 0;
}

S
Shengliang Guan 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo) {
  SMnodeMgmt *pMgmt = pWrapper->pMgmt;
  pInfo->isMnode = 1;
  mndGetLoad(pMgmt->pMnode, &pInfo->load);
}

int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
  SMonMloadInfo mloads = {0};
  mmGetMnodeLoads(pWrapper, &mloads);

  int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads);
  if (rspLen < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
  pReq->pRsp = pRsp;
  pReq->rspLen = rspLen;
  return 0;
}

S
shm  
Shengliang Guan 已提交
77 78
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SDnode  *pDnode = pWrapper->pDnode;
S
shm  
Shengliang Guan 已提交
79
  SRpcMsg *pReq = &pMsg->rpcMsg;
S
shm  
Shengliang Guan 已提交
80

S
shm  
Shengliang Guan 已提交
81 82 83 84 85 86
  SDCreateMnodeReq createReq = {0};
  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
87
  if (createReq.replica <= 1 || createReq.dnodeId != pDnode->data.dnodeId) {
88
    terrno = TSDB_CODE_INVALID_OPTION;
S
shm  
Shengliang Guan 已提交
89 90 91
    dError("failed to create mnode since %s", terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
92 93 94 95 96 97 98 99

  bool deployed = true;
  if (mmWriteFile(pWrapper, &createReq, deployed) != 0) {
    dError("failed to write mnode file since %s", terrstr());
    return -1;
  }

  return 0;
S
shm  
Shengliang Guan 已提交
100
}
S
shm  
Shengliang Guan 已提交
101

S
shm  
Shengliang Guan 已提交
102 103 104 105 106
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SDnode  *pDnode = pWrapper->pDnode;
  SRpcMsg *pReq = &pMsg->rpcMsg;

  SDDropMnodeReq dropReq = {0};
107
  if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
S
shm  
Shengliang Guan 已提交
108
    terrno = TSDB_CODE_INVALID_MSG;
S
shm  
Shengliang Guan 已提交
109 110 111
    return -1;
  }

S
Shengliang Guan 已提交
112
  if (dropReq.dnodeId != pDnode->data.dnodeId) {
113
    terrno = TSDB_CODE_INVALID_OPTION;
S
shm  
Shengliang Guan 已提交
114
    dError("failed to drop mnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
115 116
    return -1;
  }
S
Shengliang Guan 已提交
117 118 119 120 121 122 123 124

  bool deployed = false;
  if (mmWriteFile(pWrapper, NULL, deployed) != 0) {
    dError("failed to write mnode file since %s", terrstr());
    return -1;
  }

  return 0;
S
shm  
Shengliang Guan 已提交
125 126
}

S
shm  
Shengliang Guan 已提交
127
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
128
  SDnode  *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
129
  SRpcMsg *pReq = &pMsg->rpcMsg;
S
shm  
Shengliang Guan 已提交
130

S
shm  
Shengliang Guan 已提交
131 132 133 134 135 136
  SDAlterMnodeReq alterReq = {0};
  if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
137
  if (alterReq.dnodeId != pDnode->data.dnodeId) {
138
    terrno = TSDB_CODE_INVALID_OPTION;
S
Shengliang Guan 已提交
139
    dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->data.dnodeId, alterReq.dnodeId);
S
shm  
Shengliang Guan 已提交
140
    return -1;
S
shm  
Shengliang Guan 已提交
141 142
  } else {
    return mmAlter(pMgmt, &alterReq);
S
shm  
Shengliang Guan 已提交
143 144 145
  }
}

S
Shengliang Guan 已提交
146
void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
147
  dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE);
S
Shengliang Guan 已提交
148
  dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, DEFAULT_HANDLE);
149

S
shm  
Shengliang Guan 已提交
150
  // Requests handled by DNODE
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
S
shm  
Shengliang Guan 已提交
166 167

  // Requests handled by MNODE
S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
  dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
S
shm  
Shengliang Guan 已提交
221 222

  // Requests handled by VNODE
S
Shengliang Guan 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236
  dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);

  dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MNODE_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MNODE_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE);
  dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE);
S
shm  
Shengliang Guan 已提交
237
}