dndExec.c 9.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
Shengliang Guan 已提交
25
    dDebug("node:%s, needs to be started", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, mgmt start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, mgmt has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
shm  
Shengliang Guan 已提交
68

S
Shengliang Guan 已提交
69
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
S
shm  
Shengliang Guan 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
S
Shengliang Guan 已提交
87
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
S
shm  
Shengliang Guan 已提交
88 89 90
  return 0;
}

91
static void dndProcessProcHandle(void *handle) {
S
Shengliang Guan 已提交
92
  dWarn("handle:%p, the child process dies and send an offline rsp", handle);
93 94 95
  SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE};
  rpcSendResponse(&rpcMsg);
}
S
shm  
Shengliang Guan 已提交
96 97

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
98
  dInfo("dnode run in single process");
S
shm  
Shengliang Guan 已提交
99

S
Shengliang Guan 已提交
100
  for (EDndType n = DNODE; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
101 102 103 104 105 106 107 108 109 110 111 112
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

    if (dndOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

S
Shengliang Guan 已提交
113
  for (EDndType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
128
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
129 130 131 132 133 134 135 136
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
137
static int32_t dndRunInParentProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
138
  dInfo("dnode run in parent process");
S
shm  
Shengliang Guan 已提交
139 140 141 142 143
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
144

S
Shengliang Guan 已提交
145
  for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
146
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
147
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
148 149
    if (!pWrapper->required) continue;

S
Shengliang Guan 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163
    int32_t shmsize = tsMnodeShmSize;
    if (n == VNODES) {
      shmsize = tsVnodeShmSize;
    } else if (n == QNODE) {
      shmsize = tsQnodeShmSize;
    } else if (n == SNODE) {
      shmsize = tsSnodeShmSize;
    } else if (n == MNODE) {
      shmsize = tsMnodeShmSize;
    } else if (n == BNODE) {
      shmsize = tsBnodeShmSize;
    } else {
    }

S
shm  
Shengliang Guan 已提交
164
    if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
S
shm  
Shengliang Guan 已提交
165
      terrno = TAOS_SYSTEM_ERROR(terrno);
S
Shengliang Guan 已提交
166
      dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
167 168
      return -1;
    }
S
Shengliang Guan 已提交
169
    dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
S
shm  
Shengliang Guan 已提交
170

S
shm  
Shengliang Guan 已提交
171 172
    SProcCfg cfg = dndGenProcCfg(pWrapper);
    cfg.isChild = false;
S
shm  
Shengliang Guan 已提交
173
    pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
174 175 176 177
    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
178
    }
S
shm  
Shengliang Guan 已提交
179 180
  }

S
shm  
Shengliang Guan 已提交
181
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
182 183 184 185
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
186
  for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
187 188
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
189

S
shm  
Shengliang Guan 已提交
190
    if (pDnode->ntype == NODE_MAX) {
S
shm  
Shengliang Guan 已提交
191
      dInfo("node:%s, should be started manually", pWrapper->name);
S
shm  
Shengliang Guan 已提交
192
    } else {
S
shm  
Shengliang Guan 已提交
193
      if (dndNewProc(pWrapper, n) != 0) {
S
shm  
Shengliang Guan 已提交
194 195
        return -1;
      }
S
shm  
Shengliang Guan 已提交
196
    }
S
shm  
Shengliang Guan 已提交
197 198 199

    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
200 201
      return -1;
    }
S
shm  
Shengliang Guan 已提交
202
  }
S
shm  
Shengliang Guan 已提交
203

S
shm  
Shengliang Guan 已提交
204
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
205

S
shm  
Shengliang Guan 已提交
206 207 208 209
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
210

S
shm  
Shengliang Guan 已提交
211 212 213 214 215 216
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
217 218
      dndSetStatus(pDnode, DND_STAT_STOPPED);

S
Shengliang Guan 已提交
219
      for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
220 221 222 223 224
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
S
shm  
Shengliang Guan 已提交
225 226
          dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
          taosKillProc(pWrapper->procId);
S
shm  
Shengliang Guan 已提交
227 228 229 230 231 232 233
          dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
          taosWaitProc(pWrapper->procId);
          dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
        }
      }
      break;
    } else {
S
Shengliang Guan 已提交
234
      for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
235 236 237 238 239
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
S
Shengliang Guan 已提交
240
          dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
241
          taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
S
shm  
Shengliang Guan 已提交
242 243
          dndNewProc(pWrapper, n);
        }
S
shm  
Shengliang Guan 已提交
244 245
      }
    }
S
Shengliang Guan 已提交
246 247

    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
248 249
  }

S
shm  
Shengliang Guan 已提交
250 251
  return 0;
}
S
shm  
Shengliang Guan 已提交
252

S
shm  
Shengliang Guan 已提交
253 254
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
Shengliang Guan 已提交
255
  dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
256

S
Shengliang Guan 已提交
257 258 259 260 261 262
  pWrapper->required = dndRequireNode(pWrapper);
  if (!pWrapper->required) {
    dError("%s does not require startup", pWrapper->name);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
263 264 265 266
  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

S
shm  
Shengliang Guan 已提交
267 268 269
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
270 271
  }

S
shm  
Shengliang Guan 已提交
272 273
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
274 275 276 277 278
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
279

S
shm  
Shengliang Guan 已提交
280 281 282 283 284 285 286
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
287 288
  dndSetStatus(pDnode, DND_STAT_RUNNING);

S
shm  
Shengliang Guan 已提交
289 290 291
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
292 293
  }

S
shm  
Shengliang Guan 已提交
294
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
295
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
296
  while (1) {
S
shm  
Shengliang Guan 已提交
297
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
298
      dInfo("%s is about to stop", pWrapper->name);
S
shm  
Shengliang Guan 已提交
299
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
300 301
      break;
    }
S
shm  
Shengliang Guan 已提交
302 303
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
304 305

  return 0;
S
shm  
Shengliang Guan 已提交
306 307 308 309 310 311 312 313 314 315
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
316 317

  return 0;
S
shm  
Shengliang Guan 已提交
318
}