dnodeMgmt.c 9.7 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
H
hzcheng 已提交
18
#include "taosmsg.h"
S
slguan 已提交
19
#include "tlog.h"
H
hzcheng 已提交
20 21 22
#include "trpc.h"
#include "tsched.h"
#include "tsystem.h"
S
slguan 已提交
23 24 25 26 27 28
#include "mnode.h"
#include "dnode.h"
#include "dnodeSystem.h"
#include "dnodeMgmt.h"
#include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
S
slguan 已提交
29

S
slguan 已提交
30 31 32 33
void    (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL;
void    (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void    (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
S
slguan 已提交
34
void    (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
S
slguan 已提交
35

S
slguan 已提交
36
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
S
slguan 已提交
37 38
static void dnodeInitProcessShellMsg();

S
slguan 已提交
39
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
S
slguan 已提交
40 41 42 43
  int32_t contLen = *(int32_t *) (sched->msg - 4);
  int32_t code    = *(int32_t *) (sched->msg - 8);
  int8_t  msgType = *(int8_t  *) (sched->msg - 9);
  void    *handle = sched->ahandle;
S
slguan 已提交
44
  int8_t  *pCont  = sched->msg;
S
#1177  
slguan 已提交
45

S
slguan 已提交
46
  mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code);
S
#1177  
slguan 已提交
47 48
}

S
slguan 已提交
49
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
S
slguan 已提交
50
  dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]);
S
slguan 已提交
51 52 53
  if (dnodeSendMsgToMnodeFp) {
    dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
  } else {
S
slguan 已提交
54 55 56 57
    if (pCont == NULL) {
      pCont = rpcMallocCont(1);
      contLen = 0;
    }
S
slguan 已提交
58
    SSchedMsg schedMsg = {0};
S
slguan 已提交
59 60 61 62 63
    schedMsg.fp      = dnodeSendMsgToMnodeQueueFp;
    schedMsg.msg     = pCont;
    *(int32_t *) (pCont - 4) = contLen;
    *(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
    *(int8_t *)  (pCont - 9) = msgType;
S
slguan 已提交
64 65
    taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
  }
S
#1177  
slguan 已提交
66 67
}

S
slguan 已提交
68
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
S
slguan 已提交
69
  dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn);
S
slguan 已提交
70 71
  if (dnodeSendRspToMnodeFp) {
    dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
S
slguan 已提交
72
  } else {
S
slguan 已提交
73 74 75 76 77
    //hack way
    if (pCont == NULL) {
      pCont = rpcMallocCont(1);
      contLen = 0;
    }
S
slguan 已提交
78
    SSchedMsg schedMsg = {0};
S
slguan 已提交
79 80 81
    schedMsg.fp      = dnodeSendMsgToMnodeQueueFp;
    schedMsg.msg     = pCont;
    schedMsg.ahandle = pConn;
S
slguan 已提交
82 83 84
    *(int32_t *) (pCont - 4) = contLen;
    *(int32_t *) (pCont - 8) = code;
    *(int8_t *)  (pCont - 9) = msgType;
S
slguan 已提交
85 86
    taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
  }
S
#1177  
slguan 已提交
87
}
S
slguan 已提交
88

S
slguan 已提交
89 90 91 92
int32_t dnodeInitMgmt() {
  if (dnodeInitMgmtFp) {
    dnodeInitMgmtFp();
  }
S
#1177  
slguan 已提交
93

S
slguan 已提交
94 95 96 97
  dnodeInitProcessShellMsg();
  return 0;
}

S
slguan 已提交
98 99 100 101 102
void dnodeInitMgmtIp() {
  if (dnodeInitMgmtIpFp) {
    dnodeInitMgmtIpFp();
  }
}
S
#1177  
slguan 已提交
103

S
slguan 已提交
104
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
S
slguan 已提交
105 106
  if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
    dError("invalid msg type:%d", msgType);
S
slguan 已提交
107 108 109 110 111 112 113 114 115 116
    return;
  }

  dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn);

  if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
    dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
  }
  if (dnodeProcessMgmtMsgFp[msgType]) {
    (*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
H
hzcheng 已提交
117
  } else {
S
slguan 已提交
118
    dError("%s is not processed", taosMsg[msgType]);
H
hzcheng 已提交
119
  }
S
slguan 已提交
120 121

  //rpcFreeCont(pCont);
H
hzcheng 已提交
122 123
}

S
slguan 已提交
124
static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
125
  SDCreateTableMsg *pTable = pCont;
S
slguan 已提交
126 127
  pTable->numOfColumns  = htons(pTable->numOfColumns);
  pTable->numOfTags     = htons(pTable->numOfTags);
S
slguan 已提交
128 129
  pTable->sid           = htonl(pTable->sid);
  pTable->sversion      = htonl(pTable->sversion);
S
slguan 已提交
130 131
  pTable->tagDataLen    = htonl(pTable->tagDataLen);
  pTable->sqlDataLen    = htonl(pTable->sqlDataLen);
S
slguan 已提交
132 133 134 135
  pTable->contLen       = htonl(pTable->contLen);
  pTable->numOfVPeers   = htonl(pTable->numOfVPeers);
  pTable->uid           = htobe64(pTable->uid);
  pTable->superTableUid = htobe64(pTable->superTableUid);
S
slguan 已提交
136 137
  pTable->createdTime   = htobe64(pTable->createdTime);

S
slguan 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150
  for (int i = 0; i < pTable->numOfVPeers; ++i) {
    pTable->vpeerDesc[i].ip    = htonl(pTable->vpeerDesc[i].ip);
    pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
  }

  int32_t totalCols = pTable->numOfColumns + pTable->numOfTags;
  SSchema *pSchema = (SSchema *) pTable->data;
  for (int32_t col = 0; col < totalCols; ++col) {
    pSchema->bytes = htons(pSchema->bytes);
    pSchema->colId = htons(pSchema->colId);
    pSchema++;
  }

S
slguan 已提交
151 152
  int32_t code = dnodeCreateTable(pTable);
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
H
hzcheng 已提交
153 154
}

S
slguan 已提交
155
static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
156 157 158 159 160 161 162 163
  SDAlterStreamMsg *pStream = pCont;
  pStream->uid    = htobe64(pStream->uid);
  pStream->stime  = htobe64(pStream->stime);
  pStream->vnode  = htonl(pStream->vnode);
  pStream->sid    = htonl(pStream->sid);
  pStream->status = htonl(pStream->status);

  int32_t code = dnodeCreateStream(pStream);
S
slguan 已提交
164
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
H
hzcheng 已提交
165 166
}

S
slguan 已提交
167
static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
168
  SDRemoveTableMsg *pTable = pCont;
S
slguan 已提交
169
  pTable->sid         = htonl(pTable->sid);
S
slguan 已提交
170
  pTable->numOfVPeers = htonl(pTable->numOfVPeers);
S
slguan 已提交
171
  pTable->uid         = htobe64(pTable->uid);
H
hzcheng 已提交
172

S
slguan 已提交
173 174 175 176 177 178
  for (int i = 0; i < pTable->numOfVPeers; ++i) {
    pTable->vpeerDesc[i].ip    = htonl(pTable->vpeerDesc[i].ip);
    pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
  }

  int32_t code = dnodeDropTable(pTable);
S
slguan 已提交
179
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
H
hzcheng 已提交
180 181
}

S
slguan 已提交
182
static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
183
  int32_t code = htonl(*((int32_t *) pCont));
H
hzcheng 已提交
184

S
slguan 已提交
185
  if (code == TSDB_CODE_SUCCESS) {
S
slguan 已提交
186 187
    SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t));
    dnodeCreateVnode(pVnode);
S
slguan 已提交
188 189 190 191
  } else if (code == TSDB_CODE_INVALID_VNODE_ID) {
    SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
    int32_t vnode = htonl(vpeer->vnode);
    dError("vnode:%d, not exist, remove it", vnode);
S
slguan 已提交
192
    dnodeDropVnode(vnode);
H
hzcheng 已提交
193
  } else {
S
slguan 已提交
194
    dError("code:%d invalid message", code);
H
hzcheng 已提交
195 196 197
  }
}

S
slguan 已提交
198
static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  int32_t code = htonl(*((int32_t *) pCont));

  if (code == TSDB_CODE_SUCCESS) {
    SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
    dnodeCreateTable(table);
  } else if (code == TSDB_CODE_INVALID_TABLE_ID) {
    SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
    pTable->sid   = htonl(pTable->sid);
    pTable->uid   = htobe64(pTable->uid);
    dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
    dnodeDropTable(pTable);
  } else {
    dError("code:%d invalid message", code);
  }
}

S
slguan 已提交
215
static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
216
  SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
S
slguan 已提交
217

S
slguan 已提交
218
  int32_t code = dnodeCreateVnode(pVnode);
S
slguan 已提交
219
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
H
hzcheng 已提交
220 221
}

S
slguan 已提交
222
static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
223 224
  SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
  int32_t vnode = htonl(pVnode->vnode);
S
slguan 已提交
225

S
slguan 已提交
226
  int32_t code = dnodeDropVnode(vnode);
S
slguan 已提交
227
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
S
slguan 已提交
228
}
H
hzcheng 已提交
229

S
slguan 已提交
230
static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
S
slguan 已提交
231
  SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
S
slguan 已提交
232

S
slguan 已提交
233
  int32_t code = tsCfgDynamicOptions(pCfg->config);
S
slguan 已提交
234
  dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
H
hzcheng 已提交
235 236
}

S
slguan 已提交
237 238 239 240
static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
  dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
}

S
slguan 已提交
241
void dnodeSendVnodeCfgMsg(int32_t vnode) {
S
slguan 已提交
242 243 244 245
  SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
  if (cfg == NULL) {
    return;
  }
H
hzcheng 已提交
246

S
slguan 已提交
247
  cfg->vnode = htonl(vnode);
S
slguan 已提交
248
  dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg));
H
hzcheng 已提交
249 250
}

S
slguan 已提交
251
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) {
S
slguan 已提交
252
  STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg));
S
slguan 已提交
253 254 255
  if (cfg == NULL) {
    return;
  }
S
#1177  
slguan 已提交
256

S
slguan 已提交
257
  cfg->vnode = htonl(vnode);
S
slguan 已提交
258
  dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg));
S
slguan 已提交
259
}
S
slguan 已提交
260

S
slguan 已提交
261
static void dnodeInitProcessShellMsg() {
S
slguan 已提交
262 263
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
S
slguan 已提交
264
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE]       = dnodeProcessCreateVnodeRequest;
S
slguan 已提交
265
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE]         = dnodeProcessFreeVnodeRequest;
S
slguan 已提交
266 267
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG]          = dnodeProcessDnodeCfgRequest;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM]       = dnodeProcessAlterStreamRequest;
S
slguan 已提交
268
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE]        = dnodeProcessDropStableRequest;
S
slguan 已提交
269 270
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP]      = dnodeProcessVPeerCfgRsp;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP]      = dnodeProcessTableCfgRsp;
S
slguan 已提交
271
}