dmEnv.c 5.3 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

19 20 21
static SDnode global = {0};

SDnode *dmInstance() { return &global; }
22 23 24

static int32_t dmCheckRepeatInit(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
S
Shengliang Guan 已提交
25
    dError("env is already initialized");
S
shm  
Shengliang Guan 已提交
26 27 28
    terrno = TSDB_CODE_REPEAT_INIT;
    return -1;
  }
29 30
  return 0;
}
S
shm  
Shengliang Guan 已提交
31

32
static int32_t dmInitSystem() {
S
shm  
Shengliang Guan 已提交
33 34 35
  taosIgnSIGPIPE();
  taosBlockSIGPIPE();
  taosResolveCRC();
36 37
  return 0;
}
S
shm  
Shengliang Guan 已提交
38

39
static int32_t dmInitMonitor() {
S
Shengliang 已提交
40 41 42 43 44 45 46 47 48
  SMonCfg monCfg = {0};
  monCfg.maxLogs = tsMonitorMaxLogs;
  monCfg.port = tsMonitorPort;
  monCfg.server = tsMonitorFqdn;
  monCfg.comp = tsMonitorComp;
  if (monInit(&monCfg) != 0) {
    dError("failed to init monitor since %s", terrstr());
    return -1;
  }
49 50 51
  return 0;
}

52
int32_t dmInit(int8_t rtype) {
S
Shengliang Guan 已提交
53
  dInfo("start to init dnode env");
54
  if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
55 56
  if (dmInitSystem() != 0) return -1;
  if (dmInitMonitor() != 0) return -1;
57
  if (dmInitDnode(dmInstance(), rtype) != 0) return -1;
S
Shengliang 已提交
58

S
Shengliang Guan 已提交
59
  dInfo("dnode env is initialized");
S
shm  
Shengliang Guan 已提交
60 61 62
  return 0;
}

63 64
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
S
Shengliang Guan 已提交
65
    dError("dnode env is already cleaned up");
66
    return -1;
S
shm  
Shengliang Guan 已提交
67
  }
68 69 70 71
  return 0;
}

void dmCleanup() {
S
Shengliang Guan 已提交
72
  dDebug("start to cleanup dnode env");
73 74 75
  SDnode *pDnode = dmInstance();
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
  dmCleanupDnode(pDnode);
S
shm  
Shengliang Guan 已提交
76
  monCleanup();
S
Shengliang Guan 已提交
77
  syncCleanUp();
S
shm  
Shengliang Guan 已提交
78
  walCleanUp();
S
Shengliang Guan 已提交
79
  udfcClose();
S
Shengliang 已提交
80
  udfStopUdfd();
S
shm  
Shengliang Guan 已提交
81
  taosStopCacheRefreshWorker();
S
Shengliang Guan 已提交
82
  dInfo("dnode env is cleaned up");
83 84 85 86 87 88

  taosCloseLog();
  taosCleanupCfg();
}

void dmStop() {
89
  SDnode *pDnode = dmInstance();
90 91 92 93
  pDnode->stop = true;
}

int32_t dmRun() {
94
  SDnode *pDnode = dmInstance();
95 96 97 98
  return dmRunDnode(pDnode);
}

static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
99
  SDnode *pDnode = dmInstance();
100 101 102 103 104 105 106 107 108 109 110

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dmReleaseWrapper(pWrapper);
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
    dError("failed to create node since %s", terrstr());
    return -1;
  }

  pWrapper = &pDnode->wrappers[ntype];
  if (taosMkDir(pWrapper->path) != 0) {
111
    dmReleaseWrapper(pWrapper);
112 113 114 115 116
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

117
  taosThreadMutexLock(&pDnode->mutex);
118 119
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);

120
  dInfo("node:%s, start to create", pWrapper->name);
121 122 123 124
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
125
    dInfo("node:%s, has been created", pWrapper->name);
126
    (void)dmOpenNode(pWrapper);
127
    (void)dmStartNode(pWrapper);
128 129 130 131 132 133 134 135 136 137
    pWrapper->required = true;
    pWrapper->deployed = true;
    pWrapper->proc.ptype = pDnode->ptype;
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
138
  SDnode *pDnode = dmInstance();
139 140 141 142 143 144 145 146 147

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
    dError("failed to drop node since %s", terrstr());
    return -1;
  }

  taosThreadMutexLock(&pDnode->mutex);
148
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
149

150 151
  dInfo("node:%s, start to drop", pWrapper->name);
  int32_t code = (*pWrapper->func.dropFp)(&input, pMsg);
152 153 154
  if (code != 0) {
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
155
    dInfo("node:%s, has been dropped", pWrapper->name);
156 157 158 159 160 161 162
    pWrapper->required = false;
    pWrapper->deployed = false;
  }

  dmReleaseWrapper(pWrapper);

  if (code == 0) {
163
    dmStopNode(pWrapper);
164 165 166 167 168 169 170 171 172 173 174
    dmCloseNode(pWrapper);
    taosRemoveDir(pWrapper->path);
  }
  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
  SMgmtInputOpt opt = {
      .path = pWrapper->path,
      .name = pWrapper->name,
175
      .pData = &pWrapper->pDnode->data,
176 177
      .processCreateNodeFp = dmProcessCreateNodeReq,
      .processDropNodeFp = dmProcessDropNodeReq,
S
Shengliang Guan 已提交
178 179 180
      .sendMonitorReportFp = dmSendMonitorReport,
      .getVnodeLoadsFp = dmGetVnodeLoads,
      .getMnodeLoadsFp = dmGetMnodeLoads,
D
dapan1121 已提交
181
      .getQnodeLoadsFp = dmGetQnodeLoads,
182 183
  };

184
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
185 186 187 188
  return opt;
}

void dmReportStartup(const char *pName, const char *pDesc) {
189
  SStartupInfo *pStartup = &(dmInstance()->startup);
190 191 192
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
S
shm  
Shengliang Guan 已提交
193
}