dmEnv.c 9.5 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static SDnode globalDnode = {0};
20

S
Shengliang Guan 已提交
21
SDnode *dmInstance() { return &globalDnode; }
22 23 24

static int32_t dmCheckRepeatInit(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
S
Shengliang Guan 已提交
25
    dError("env is already initialized");
S
shm  
Shengliang Guan 已提交
26 27 28
    terrno = TSDB_CODE_REPEAT_INIT;
    return -1;
  }
29 30
  return 0;
}
S
shm  
Shengliang Guan 已提交
31

32
static int32_t dmInitSystem() {
S
shm  
Shengliang Guan 已提交
33 34 35
  taosIgnSIGPIPE();
  taosBlockSIGPIPE();
  taosResolveCRC();
36 37
  return 0;
}
S
shm  
Shengliang Guan 已提交
38

39
static int32_t dmInitMonitor() {
S
Shengliang 已提交
40 41 42 43 44 45 46 47 48
  SMonCfg monCfg = {0};
  monCfg.maxLogs = tsMonitorMaxLogs;
  monCfg.port = tsMonitorPort;
  monCfg.server = tsMonitorFqdn;
  monCfg.comp = tsMonitorComp;
  if (monInit(&monCfg) != 0) {
    dError("failed to init monitor since %s", terrstr());
    return -1;
  }
49 50 51
  return 0;
}

wafwerar's avatar
wafwerar 已提交
52 53
static bool dmCheckDiskSpace() {
  osUpdate();
54
  // sufficiency
55
  if (!osDataSpaceSufficient()) {
56 57 58
    dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
59
  }
60
  if (!osLogSpaceSufficient()) {
61 62 63
    dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
64
  }
65
  if (!osTempSpaceSufficient()) {
66 67 68
    dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
69
  }
70 71 72 73
  // availability
  bool ret = true;
  if (!osDataSpaceAvailable()) {
    dError("data disk space unavailable, i.e. %s", tsDataDir);
74
    terrno = TSDB_CODE_NO_DISKSPACE;
75 76 77 78
    ret = false;
  }
  if (!osLogSpaceAvailable()) {
    dError("log disk space unavailable, i.e. %s", tsLogDir);
79
    terrno = TSDB_CODE_NO_DISKSPACE;
80 81 82 83
    ret = false;
  }
  if (!osTempSpaceAvailable()) {
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
84
    terrno = TSDB_CODE_NO_DISKSPACE;
85 86 87
    ret = false;
  }
  return ret;
wafwerar's avatar
wafwerar 已提交
88 89 90
}

static bool dmCheckDataDirVersion() {
91
  char checkDataDirJsonFileName[PATH_MAX] = {0};
wafwerar's avatar
wafwerar 已提交
92 93
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
H
Hongze Cheng 已提交
94 95
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
           tsDataDir);
wafwerar's avatar
wafwerar 已提交
96 97 98 99 100
    return false;
  }
  return true;
}

101
int32_t dmInit() {
S
Shengliang Guan 已提交
102
  dInfo("start to init dnode env");
wafwerar's avatar
wafwerar 已提交
103
  if (!dmCheckDataDirVersion()) return -1;
wafwerar's avatar
wafwerar 已提交
104
  if (!dmCheckDiskSpace()) return -1;
105
  if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
106 107
  if (dmInitSystem() != 0) return -1;
  if (dmInitMonitor() != 0) return -1;
108
  if (dmInitDnode(dmInstance()) != 0) return -1;
S
Shengliang 已提交
109

S
Shengliang Guan 已提交
110
  dInfo("dnode env is initialized");
S
shm  
Shengliang Guan 已提交
111 112 113
  return 0;
}

114 115
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
S
Shengliang Guan 已提交
116
    dError("dnode env is already cleaned up");
117
    return -1;
S
shm  
Shengliang Guan 已提交
118
  }
119 120 121 122
  return 0;
}

void dmCleanup() {
S
Shengliang Guan 已提交
123
  dDebug("start to cleanup dnode env");
124 125 126
  SDnode *pDnode = dmInstance();
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
  dmCleanupDnode(pDnode);
S
shm  
Shengliang Guan 已提交
127
  monCleanup();
S
Shengliang Guan 已提交
128
  syncCleanUp();
S
shm  
Shengliang Guan 已提交
129
  walCleanUp();
S
Shengliang Guan 已提交
130
  udfcClose();
S
Shengliang 已提交
131
  udfStopUdfd();
S
shm  
Shengliang Guan 已提交
132
  taosStopCacheRefreshWorker();
S
Shengliang Guan 已提交
133
  dInfo("dnode env is cleaned up");
134 135

  taosCleanupCfg();
136
  taosCloseLog();
137 138 139
}

void dmStop() {
140
  SDnode *pDnode = dmInstance();
141 142 143 144
  pDnode->stop = true;
}

int32_t dmRun() {
145
  SDnode *pDnode = dmInstance();
146 147 148 149
  return dmRunDnode(pDnode);
}

static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
150
  SDnode *pDnode = dmInstance();
151 152 153 154

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dmReleaseWrapper(pWrapper);
155 156 157 158 159 160 161 162 163 164 165 166 167
    switch (ntype) {
      case MNODE:
        terrno = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
        break;
      case QNODE:
        terrno = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
        break;
      case SNODE:
        terrno = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
        break;
      default:
        terrno = TSDB_CODE_APP_ERROR;
    }
168 169 170 171
    dError("failed to create node since %s", terrstr());
    return -1;
  }

C
cadem 已提交
172 173
  dInfo("start to process create-node-request");

174 175
  pWrapper = &pDnode->wrappers[ntype];
  if (taosMkDir(pWrapper->path) != 0) {
176
    dmReleaseWrapper(pWrapper);
177 178 179 180 181
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

182
  taosThreadMutexLock(&pDnode->mutex);
183 184
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);

185
  dInfo("node:%s, start to create", pWrapper->name);
186 187 188 189
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
190
    dInfo("node:%s, has been created", pWrapper->name);
S
Shengliang Guan 已提交
191
    code = dmOpenNode(pWrapper);
S
Shengliang Guan 已提交
192
    if (code == 0) {
S
Shengliang Guan 已提交
193 194
      code = dmStartNode(pWrapper);
    }
195
    pWrapper->deployed = true;
S
Shengliang Guan 已提交
196
    pWrapper->required = true;
197 198 199 200 201 202
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

C
cadem 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
  SDnode *pDnode = dmInstance();

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    dError("fail to process alter node type since node not exist");
    return -1;
  }
  dmReleaseWrapper(pWrapper);

  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);

  pWrapper = &pDnode->wrappers[ntype];

  if(pWrapper->func.isCatchUpFp != NULL){
    dInfo("node:%s, checking node catch up", pWrapper->name);
    if(!(*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) == 0){
      return -1;
    }
  }

  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);

  taosThreadMutexLock(&pDnode->mutex);

  dInfo("node:%s, stopping node", pWrapper->name);
  dmStopNode(pWrapper);
  dInfo("node:%s, closing node", pWrapper->name);
  dmCloseNode(pWrapper);

  pWrapper = &pDnode->wrappers[ntype];
  if (taosMkDir(pWrapper->path) != 0) {
    dmReleaseWrapper(pWrapper);
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);

  dInfo("node:%s, start to create", pWrapper->name);
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
    dInfo("node:%s, has been created", pWrapper->name);
    code = dmOpenNode(pWrapper);
    if (code == 0) {
      code = dmStartNode(pWrapper);
    }
    pWrapper->deployed = true;
    pWrapper->required = true;
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

261
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
262
  SDnode *pDnode = dmInstance();
263 264 265

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
266 267 268 269 270 271 272 273 274 275 276 277 278 279
    switch (ntype) {
      case MNODE:
        terrno = TSDB_CODE_MNODE_NOT_DEPLOYED;
        break;
      case QNODE:
        terrno = TSDB_CODE_QNODE_NOT_DEPLOYED;
        break;
      case SNODE:
        terrno = TSDB_CODE_SNODE_NOT_DEPLOYED;
        break;
      default:
        terrno = TSDB_CODE_APP_ERROR;
    }

280 281 282 283 284
    dError("failed to drop node since %s", terrstr());
    return -1;
  }

  taosThreadMutexLock(&pDnode->mutex);
285
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
286

287 288
  dInfo("node:%s, start to drop", pWrapper->name);
  int32_t code = (*pWrapper->func.dropFp)(&input, pMsg);
289 290 291
  if (code != 0) {
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
292
    dInfo("node:%s, has been dropped", pWrapper->name);
293 294 295 296 297 298 299
    pWrapper->required = false;
    pWrapper->deployed = false;
  }

  dmReleaseWrapper(pWrapper);

  if (code == 0) {
300
    dmStopNode(pWrapper);
301 302 303 304 305 306 307 308 309 310 311
    dmCloseNode(pWrapper);
    taosRemoveDir(pWrapper->path);
  }
  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
  SMgmtInputOpt opt = {
      .path = pWrapper->path,
      .name = pWrapper->name,
312
      .pData = &pWrapper->pDnode->data,
313
      .processCreateNodeFp = dmProcessCreateNodeReq,
C
cadem 已提交
314
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
315
      .processDropNodeFp = dmProcessDropNodeReq,
S
Shengliang Guan 已提交
316 317 318
      .sendMonitorReportFp = dmSendMonitorReport,
      .getVnodeLoadsFp = dmGetVnodeLoads,
      .getMnodeLoadsFp = dmGetMnodeLoads,
D
dapan1121 已提交
319
      .getQnodeLoadsFp = dmGetQnodeLoads,
320 321
  };

322
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
323 324 325 326
  return opt;
}

void dmReportStartup(const char *pName, const char *pDesc) {
327
  SStartupInfo *pStartup = &(dmInstance()->startup);
328 329 330
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
S
shm  
Shengliang Guan 已提交
331
}
D
dapan1121 已提交
332 333

int64_t dmGetClusterId() {
S
Shengliang Guan 已提交
334
  return globalDnode.data.clusterId;
D
dapan1121 已提交
335 336
}