dnodeMgmt.c 5.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18
#include "ihash.h"
S
slguan 已提交
19 20
#include "taoserror.h"
#include "taosmsg.h"
H
hzcheng 已提交
21
#include "trpc.h"
S
slguan 已提交
22
#include "tsdb.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
#include "twal.h"
S
slguan 已提交
24
#include "vnode.h"
S
slguan 已提交
25 26
#include "tglobal.h"
#include "dnodeLog.h"
S
slguan 已提交
27
#include "dnodeMClient.h"
S
slguan 已提交
28
#include "dnodeMgmt.h"
S
slguan 已提交
29 30
#include "dnodeRead.h"
#include "dnodeWrite.h"
S
slguan 已提交
31

S
slguan 已提交
32
static int32_t  dnodeOpenVnodes();
33
static void     dnodeCloseVnodes();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34 35 36 37 38
static int32_t  dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t  dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t  dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t  dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
S
slguan 已提交
39 40

int32_t dnodeInitMgmt() {
S
slguan 已提交
41 42
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE]   = dnodeProcessDropVnodeMsg;
S
slguan 已提交
43 44
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
  dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
S
slguan 已提交
45

S
[TD-17]  
slguan 已提交
46 47 48 49
  int32_t code = dnodeOpenVnodes();
  if (code != TSDB_CODE_SUCCESS) {
    return -1;
  }
S
slguan 已提交
50

S
[TD-17]  
slguan 已提交
51
  return TSDB_CODE_SUCCESS;
S
#1177  
slguan 已提交
52 53
}

S
slguan 已提交
54
void dnodeCleanupMgmt() {
55
  dnodeCloseVnodes();
S
#1177  
slguan 已提交
56 57
}

S
slguan 已提交
58
void dnodeMgmt(SRpcMsg *pMsg) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
  SRpcMsg rsp;
S
slguan 已提交
60 61

  if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62
    rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
S
slguan 已提交
63
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64
    rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
S
slguan 已提交
65
  }
S
slguan 已提交
66

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68 69
  rsp.handle = pMsg->handle;
  rsp.pCont  = NULL;
  rpcSendResponse(&rsp);
S
slguan 已提交
70

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71
  rpcFreeCont(pMsg->pCont);
S
slguan 已提交
72
}
S
slguan 已提交
73

S
slguan 已提交
74
static int32_t dnodeGetVnodeList(int32_t vnodeList[]) {
S
slguan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88
  DIR *dir = opendir(tsVnodeDir);
  if (dir == NULL) {
    return TSDB_CODE_NO_WRITE_ACCESS;
  }

  int32_t numOfVnodes = 0;
  struct dirent *de = NULL;
  while ((de = readdir(dir)) != NULL) {
    if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
    if (de->d_type & DT_DIR) {
      if (strncmp("vnode", de->d_name, 5) != 0) continue;
      int32_t vnode = atoi(de->d_name + 5);
      if (vnode == 0) continue;

89 90
      vnodeList[numOfVnodes] = vnode;
      numOfVnodes++;
S
slguan 已提交
91 92 93 94
    }
  }
  closedir(dir);

95 96 97 98
  return numOfVnodes;
}

static int32_t dnodeOpenVnodes() {
S
slguan 已提交
99 100 101
  char vnodeDir[TSDB_FILENAME_LEN * 3];
  int32_t failed = 0;

102
  int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
S
slguan 已提交
103
  int32_t  numOfVnodes = dnodeGetVnodeList(vnodeList);
104

S
slguan 已提交
105 106 107 108
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
    if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
  }
109

S
slguan 已提交
110
  free(vnodeList);
111

S
slguan 已提交
112 113
  dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
  return TSDB_CODE_SUCCESS;
114 115 116
}

static void dnodeCloseVnodes() {
117
  int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
S
slguan 已提交
118 119 120 121 122 123 124 125
  int32_t  numOfVnodes = dnodeGetVnodeList(vnodeList);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    vnodeClose(vnodeList[i]);
  }

  free(vnodeList);
  dPrint("total vnodes:%d are all closed", numOfVnodes);
H
hzcheng 已提交
126 127
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
S
slguan 已提交
129
  SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
S
slguan 已提交
130 131 132 133 134 135 136 137 138 139 140 141
  pCreate->cfg.vgId                = htonl(pCreate->cfg.vgId);
  pCreate->cfg.maxTables           = htonl(pCreate->cfg.maxTables);
  pCreate->cfg.maxCacheSize        = htobe64(pCreate->cfg.maxCacheSize);
  pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
  pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
  pCreate->cfg.daysPerFile         = htonl(pCreate->cfg.daysPerFile);
  pCreate->cfg.daysToKeep1         = htonl(pCreate->cfg.daysToKeep1);
  pCreate->cfg.daysToKeep2         = htonl(pCreate->cfg.daysToKeep2);
  pCreate->cfg.daysToKeep          = htonl(pCreate->cfg.daysToKeep);
  pCreate->cfg.commitTime          = htonl(pCreate->cfg.commitTime);
  pCreate->cfg.arbitratorIp        = htonl(pCreate->cfg.arbitratorIp);

142
  for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
S
slguan 已提交
143 144
    pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
    pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
145 146
  }
  
S
slguan 已提交
147 148 149 150 151 152 153 154
  void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId);
  if (pVnode != NULL) {
    int32_t code = vnodeAlter(pVnode, pCreate);
    vnodeRelease(pVnode);
    return code;
  } else {
    return vnodeCreate(pCreate);
  }
S
slguan 已提交
155 156
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
S
slguan 已提交
158
  SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
S
slguan 已提交
159
  pDrop->vgId = htonl(pDrop->vgId);
S
slguan 已提交
160

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
  return vnodeDrop(pDrop->vgId);
H
hzcheng 已提交
162 163
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
165
//  SMDAlterStreamMsg *pStream = pCont;
S
slguan 已提交
166 167 168 169 170 171 172
//  pStream->uid    = htobe64(pStream->uid);
//  pStream->stime  = htobe64(pStream->stime);
//  pStream->vnode  = htonl(pStream->vnode);
//  pStream->sid    = htonl(pStream->sid);
//  pStream->status = htonl(pStream->status);
//
//  int32_t code = dnodeCreateStream(pStream);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174

  return 0;
S
slguan 已提交
175 176
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
S
slguan 已提交
178
  SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
S
slguan 已提交
179
  return taosCfgDynamicOptions(pCfg->config);
S
slguan 已提交
180
}