dmEnv.c 9.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "dmMgmt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static SDnode globalDnode = {0};
20

S
Shengliang Guan 已提交
21
SDnode *dmInstance() { return &globalDnode; }
22 23 24

static int32_t dmCheckRepeatInit(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
S
Shengliang Guan 已提交
25
    dError("env is already initialized");
S
shm  
Shengliang Guan 已提交
26 27 28
    terrno = TSDB_CODE_REPEAT_INIT;
    return -1;
  }
29 30
  return 0;
}
S
shm  
Shengliang Guan 已提交
31

32
static int32_t dmInitSystem() {
S
shm  
Shengliang Guan 已提交
33 34 35
  taosIgnSIGPIPE();
  taosBlockSIGPIPE();
  taosResolveCRC();
36 37
  return 0;
}
S
shm  
Shengliang Guan 已提交
38

39
static int32_t dmInitMonitor() {
S
Shengliang 已提交
40 41 42 43 44 45 46 47 48
  SMonCfg monCfg = {0};
  monCfg.maxLogs = tsMonitorMaxLogs;
  monCfg.port = tsMonitorPort;
  monCfg.server = tsMonitorFqdn;
  monCfg.comp = tsMonitorComp;
  if (monInit(&monCfg) != 0) {
    dError("failed to init monitor since %s", terrstr());
    return -1;
  }
49 50 51
  return 0;
}

wafwerar's avatar
wafwerar 已提交
52 53
static bool dmCheckDiskSpace() {
  osUpdate();
54
  // sufficiency
55
  if (!osDataSpaceSufficient()) {
56 57 58
    dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
59
  }
60
  if (!osLogSpaceSufficient()) {
61 62 63
    dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
64
  }
65
  if (!osTempSpaceSufficient()) {
66 67 68
    dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least",
          (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
          (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
wafwerar's avatar
wafwerar 已提交
69
  }
70 71 72 73
  // availability
  bool ret = true;
  if (!osDataSpaceAvailable()) {
    dError("data disk space unavailable, i.e. %s", tsDataDir);
74
    terrno = TSDB_CODE_NO_DISKSPACE;
75 76 77 78
    ret = false;
  }
  if (!osLogSpaceAvailable()) {
    dError("log disk space unavailable, i.e. %s", tsLogDir);
79
    terrno = TSDB_CODE_NO_DISKSPACE;
80 81 82 83
    ret = false;
  }
  if (!osTempSpaceAvailable()) {
    dError("temp disk space unavailable, i.e. %s", tsTempDir);
84
    terrno = TSDB_CODE_NO_DISKSPACE;
85 86 87
    ret = false;
  }
  return ret;
wafwerar's avatar
wafwerar 已提交
88 89 90
}

static bool dmCheckDataDirVersion() {
91
  char checkDataDirJsonFileName[PATH_MAX] = {0};
wafwerar's avatar
wafwerar 已提交
92 93
  snprintf(checkDataDirJsonFileName, PATH_MAX, "%s/dnode/dnodeCfg.json", tsDataDir);
  if (taosCheckExistFile(checkDataDirJsonFileName)) {
H
Hongze Cheng 已提交
94 95
    dError("The default data directory %s contains old data of tdengine 2.x, please clear it before running!",
           tsDataDir);
wafwerar's avatar
wafwerar 已提交
96 97 98 99 100
    return false;
  }
  return true;
}

101
int32_t dmInit() {
S
Shengliang Guan 已提交
102
  dInfo("start to init dnode env");
wafwerar's avatar
wafwerar 已提交
103
  if (!dmCheckDataDirVersion()) return -1;
wafwerar's avatar
wafwerar 已提交
104
  if (!dmCheckDiskSpace()) return -1;
105
  if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
106 107
  if (dmInitSystem() != 0) return -1;
  if (dmInitMonitor() != 0) return -1;
108
  if (dmInitDnode(dmInstance()) != 0) return -1;
S
Shengliang 已提交
109

S
Shengliang Guan 已提交
110
  dInfo("dnode env is initialized");
S
shm  
Shengliang Guan 已提交
111 112 113
  return 0;
}

114 115
static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
  if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
S
Shengliang Guan 已提交
116
    dError("dnode env is already cleaned up");
117
    return -1;
S
shm  
Shengliang Guan 已提交
118
  }
119 120 121 122
  return 0;
}

void dmCleanup() {
S
Shengliang Guan 已提交
123
  dDebug("start to cleanup dnode env");
124 125 126
  SDnode *pDnode = dmInstance();
  if (dmCheckRepeatCleanup(pDnode) != 0) return;
  dmCleanupDnode(pDnode);
S
shm  
Shengliang Guan 已提交
127
  monCleanup();
S
Shengliang Guan 已提交
128
  syncCleanUp();
S
shm  
Shengliang Guan 已提交
129
  walCleanUp();
S
Shengliang Guan 已提交
130
  udfcClose();
S
Shengliang 已提交
131
  udfStopUdfd();
S
shm  
Shengliang Guan 已提交
132
  taosStopCacheRefreshWorker();
S
Shengliang Guan 已提交
133
  dInfo("dnode env is cleaned up");
134 135

  taosCleanupCfg();
136
  taosCloseLog();
137 138 139
}

void dmStop() {
140
  SDnode *pDnode = dmInstance();
141 142 143 144
  pDnode->stop = true;
}

int32_t dmRun() {
145
  SDnode *pDnode = dmInstance();
146 147 148 149
  return dmRunDnode(pDnode);
}

static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
150
  SDnode *pDnode = dmInstance();
151 152 153 154

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dmReleaseWrapper(pWrapper);
155 156 157 158 159 160 161 162 163 164 165 166 167
    switch (ntype) {
      case MNODE:
        terrno = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
        break;
      case QNODE:
        terrno = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
        break;
      case SNODE:
        terrno = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
        break;
      default:
        terrno = TSDB_CODE_APP_ERROR;
    }
168 169 170 171
    dError("failed to create node since %s", terrstr());
    return -1;
  }

C
cadem 已提交
172 173
  dInfo("start to process create-node-request");

174 175
  pWrapper = &pDnode->wrappers[ntype];
  if (taosMkDir(pWrapper->path) != 0) {
176
    dmReleaseWrapper(pWrapper);
177 178 179 180 181
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

182
  taosThreadMutexLock(&pDnode->mutex);
183 184
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);

185
  dInfo("node:%s, start to create", pWrapper->name);
186 187 188 189
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
190
    dInfo("node:%s, has been created", pWrapper->name);
S
Shengliang Guan 已提交
191
    code = dmOpenNode(pWrapper);
S
Shengliang Guan 已提交
192
    if (code == 0) {
S
Shengliang Guan 已提交
193 194
      code = dmStartNode(pWrapper);
    }
195
    pWrapper->deployed = true;
S
Shengliang Guan 已提交
196
    pWrapper->required = true;
197 198 199 200 201 202
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

C
cadem 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216
static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
  SDnode *pDnode = dmInstance();

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    dError("fail to process alter node type since node not exist");
    return -1;
  }
  dmReleaseWrapper(pWrapper);

  dInfo("node:%s, start to process alter-node-type-request", pWrapper->name);

  pWrapper = &pDnode->wrappers[ntype];

C
cadem 已提交
217 218 219 220
  if(pWrapper->func.nodeRoleFp != NULL){
    ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
    dInfo("node:%s, checking node role:%d", pWrapper->name, role);
    if(role == TAOS_SYNC_ROLE_VOTER){
D
dmchen 已提交
221
      dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
C
cadem 已提交
222 223 224 225 226
      terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
      return -1;
    }
  }

C
cadem 已提交
227 228
  if(pWrapper->func.isCatchUpFp != NULL){
    dInfo("node:%s, checking node catch up", pWrapper->name);
C
cadem 已提交
229 230
    if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){
      terrno = TSDB_CODE_MNODE_NOT_CATCH_UP;
C
cadem 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
      return -1;
    }
  }

  dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);

  taosThreadMutexLock(&pDnode->mutex);

  dInfo("node:%s, stopping node", pWrapper->name);
  dmStopNode(pWrapper);
  dInfo("node:%s, closing node", pWrapper->name);
  dmCloseNode(pWrapper);

  pWrapper = &pDnode->wrappers[ntype];
  if (taosMkDir(pWrapper->path) != 0) {
    dmReleaseWrapper(pWrapper);
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);

  dInfo("node:%s, start to create", pWrapper->name);
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
    dInfo("node:%s, has been created", pWrapper->name);
    code = dmOpenNode(pWrapper);
    if (code == 0) {
      code = dmStartNode(pWrapper);
    }
    pWrapper->deployed = true;
    pWrapper->required = true;
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

272
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
273
  SDnode *pDnode = dmInstance();
274 275 276

  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
277 278 279 280 281 282 283 284 285 286 287 288 289 290
    switch (ntype) {
      case MNODE:
        terrno = TSDB_CODE_MNODE_NOT_DEPLOYED;
        break;
      case QNODE:
        terrno = TSDB_CODE_QNODE_NOT_DEPLOYED;
        break;
      case SNODE:
        terrno = TSDB_CODE_SNODE_NOT_DEPLOYED;
        break;
      default:
        terrno = TSDB_CODE_APP_ERROR;
    }

291 292 293 294 295
    dError("failed to drop node since %s", terrstr());
    return -1;
  }

  taosThreadMutexLock(&pDnode->mutex);
296
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
297

298 299
  dInfo("node:%s, start to drop", pWrapper->name);
  int32_t code = (*pWrapper->func.dropFp)(&input, pMsg);
300 301 302
  if (code != 0) {
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
303
    dInfo("node:%s, has been dropped", pWrapper->name);
304 305 306 307 308 309 310
    pWrapper->required = false;
    pWrapper->deployed = false;
  }

  dmReleaseWrapper(pWrapper);

  if (code == 0) {
311
    dmStopNode(pWrapper);
312 313 314 315 316 317 318 319 320 321 322
    dmCloseNode(pWrapper);
    taosRemoveDir(pWrapper->path);
  }
  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
  SMgmtInputOpt opt = {
      .path = pWrapper->path,
      .name = pWrapper->name,
323
      .pData = &pWrapper->pDnode->data,
324
      .processCreateNodeFp = dmProcessCreateNodeReq,
C
cadem 已提交
325
      .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
326
      .processDropNodeFp = dmProcessDropNodeReq,
S
Shengliang Guan 已提交
327 328 329
      .sendMonitorReportFp = dmSendMonitorReport,
      .getVnodeLoadsFp = dmGetVnodeLoads,
      .getMnodeLoadsFp = dmGetMnodeLoads,
D
dapan1121 已提交
330
      .getQnodeLoadsFp = dmGetQnodeLoads,
331 332
  };

333
  opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
334 335 336 337
  return opt;
}

void dmReportStartup(const char *pName, const char *pDesc) {
338
  SStartupInfo *pStartup = &(dmInstance()->startup);
339 340 341
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
S
shm  
Shengliang Guan 已提交
342
}
D
dapan1121 已提交
343 344

int64_t dmGetClusterId() {
S
Shengliang Guan 已提交
345
  return globalDnode.data.clusterId;
D
dapan1121 已提交
346 347
}