dnodeMain.c 7.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "tcache.h"
#include "tconfig.h"
20
#if 0
21
#include "tfs.h"
22
#endif
23
#include "tscompression.h"
24
#include "tnote.h"
25 26 27 28 29 30
#include "ttimer.h"
#include "dnodeCfg.h"
#include "dnodeMain.h"
#include "mnode.h"

static void dnodeCheckDataDirOpenned(char *dir) {
31
#if 0
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
  char filepath[256] = {0};
  snprintf(filepath, sizeof(filepath), "%s/.running", dir);

  int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
  if (fd < 0) {
    dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno));
    exit(0);
  }

  int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
  if (ret != 0) {
    dError("failed to lock file:%s ret:%d since %s, database may be running, quit", filepath, ret, strerror(errno));
    close(fd);
    exit(0);
  }
47
#endif
48 49
}

S
Shengliang Guan 已提交
50 51 52 53 54 55 56 57
void dnodePrintDiskInfo() {
  dInfo("==================================");
  dInfo(" os totalDisk:           %f(GB)", tsTotalDataDirGB);
  dInfo(" os usedDisk:            %f(GB)", tsUsedDataDirGB);
  dInfo(" os availDisk:           %f(GB)", tsAvailDataDirGB);
  dInfo("==================================");
}

58
int32_t dnodeInitMain(DnMain **out) {
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  DnMain* main = calloc(1, sizeof(DnMain));
  if (main == NULL) return -1;

  main->runStatus = TD_RUN_STAT_STOPPED;
  main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR");
  if (main->dnodeTimer == NULL) {
    dError("failed to init dnode timer");
    return -1;
  }

  *out = main;

  tscEmbedded = 1;
  taosIgnSIGPIPE();
  taosBlockSIGPIPE();
  taosResolveCRC();
  taosInitGlobalCfg();
  taosReadGlobalLogCfg();
S
Shengliang Guan 已提交
77
  taosSetCoreDump(tsEnableCoreFile);
78

79

80
  if (!taosMkDir(tsLogDir)) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
   printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
   return -1;
  }

  char temp[TSDB_FILENAME_LEN];
  sprintf(temp, "%s/taosdlog", tsLogDir);
  if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
    printf("failed to init log file\n");
  }

  if (!taosReadGlobalCfg()) {
    taosPrintGlobalCfg();
    dError("TDengine read global config failed");
    return -1;
  }

  dInfo("start to initialize TDengine");

  taosInitNotes();

  return taosCheckGlobalCfg();
}

void dnodeCleanupMain(DnMain **out) {
  DnMain *main = *out;
  *out = NULL;

  if (main->dnodeTimer != NULL) {
    taosTmrCleanUp(main->dnodeTimer);
    main->dnodeTimer = NULL;
  }

S
Shengliang Guan 已提交
113
#if 0
114
  taos_cleanup();
S
Shengliang Guan 已提交
115
#endif
116 117 118 119 120 121
  taosCloseLog();
  taosStopCacheRefreshWorker();

  free(main);
}

122
int32_t dnodeInitStorage() {
123 124 125 126 127 128
#ifdef TD_TSZ
  // compress module init
  tsCompressInit();
#endif

  // storage module init
129
  if (tsDiskCfgNum == 1 && !taosMkDir(tsDataDir)) {
130 131 132 133
    dError("failed to create dir:%s since %s", tsDataDir, strerror(errno));
    return -1;
  }

134
#if 0
135 136 137 138
  if (tfsInit(tsDiskCfg, tsDiskCfgNum) < 0) {
    dError("failed to init TFS since %s", tstrerror(terrno));
    return -1;
  }
139

140
  strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN);
141
#endif    
142 143 144 145
  sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
  sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
  sprintf(tsDnodeDir, "%s/dnode", tsDataDir);

146
  if (!taosMkDir(tsMnodeDir)) {
147 148 149 150
    dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
    return -1;
  }

151
  if (!taosMkDir(tsDnodeDir)) {
152 153 154 155
    dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
    return -1;
  }

156
#if 0
157 158 159 160 161 162 163 164 165 166
  if (tfsMkdir("vnode") < 0) {
    dError("failed to create vnode dir since %s", tstrerror(terrno));
    return -1;
  }

  if (tfsMkdir("vnode_bak") < 0) {
    dError("failed to create vnode_bak dir since %s", tstrerror(terrno));
    return -1;
  }

167

168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  TDIR *tdir = tfsOpendir("vnode_bak/.staging");
  bool  stagingNotEmpty = tfsReaddir(tdir) != NULL;
  tfsClosedir(tdir);

  if (stagingNotEmpty) {
    dError("vnode_bak/.staging dir not empty, fix it first.");
    return -1;
  }

  if (tfsMkdir("vnode_bak/.staging") < 0) {
    dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno));
    return -1;
  }

  dnodeCheckDataDirOpenned(tsDnodeDir);

  taosGetDisk();
S
Shengliang Guan 已提交
185
  dnodePrintDiskInfo();
186
#endif  
187 188 189 190 191

  dInfo("dnode storage is initialized at %s", tsDnodeDir);
  return 0;
}

192
void dnodeCleanupStorage() {
193
#if 0  
194 195 196 197 198 199 200
  // storage destroy
  tfsDestroy();

 #ifdef TD_TSZ
  // compress destroy
  tsCompressExit();
 #endif
201
#endif
202 203
}

204 205
void dnodeReportStartup(char *name, char *desc) {
  Dnode *dnode = dnodeInst();
S
Shengliang Guan 已提交
206 207 208 209 210 211
  if (dnode->main != NULL) {
    SStartupStep *startup = &dnode->main->startup;
    tstrncpy(startup->name, name, strlen(startup->name));
    tstrncpy(startup->desc, desc, strlen(startup->desc));
    startup->finished = 0;
  }
212 213
}

214 215
void dnodeReportStartupFinished(char *name, char *desc) {
  Dnode *dnode = dnodeInst();
216 217 218 219 220 221
  SStartupStep *startup = &dnode->main->startup;
  tstrncpy(startup->name, name, strlen(startup->name));
  tstrncpy(startup->desc, desc, strlen(startup->desc));
  startup->finished = 1;
}

222
void dnodeProcessStartupReq(SRpcMsg *pMsg) {
223 224
  dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);

225
  Dnode *dnode = dnodeInst();
226 227 228 229 230 231 232 233 234 235
  SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
  memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep));

  dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);

  SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
  rpcSendResponse(&rpcRsp);
  rpcFreeCont(pMsg->pCont);
}

236 237
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
  Dnode *dnode = dnodeInst();
238 239
  SCreateMnodeMsg *pCfg = pMsg->pCont;
  pCfg->dnodeId = htonl(pCfg->dnodeId);
240
  if (pCfg->dnodeId != dnode->cfg->dnodeId) {
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
    dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId,
           dnodeGetDnodeId(dnode->cfg));
    return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
  }

  if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
    dDebug("dnodeEp:%s, in create meps msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
    return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
  }

  dDebug("dnode:%d, create meps msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
  for (int32_t i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
    pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
    dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
  }

  if (mnodeIsServing(dnode->mnode)) return 0;

259
  return mnodeDeploy(&pCfg->mnodes);
260 261
}

262 263
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
  int32_t code = dnodeStartMnode(pMsg);
264 265 266 267 268 269 270

  SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};

  rpcSendResponse(&rspMsg);
  rpcFreeCont(pMsg->pCont);
}

271
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
272 273 274 275 276 277 278 279 280
  SCfgDnodeMsg *pCfg = pMsg->pCont;

  int32_t code = taosCfgDynamicOptions(pCfg->config);

  SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};

  rpcSendResponse(&rspMsg);
  rpcFreeCont(pMsg->pCont);
}