dndExec.c 10.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
Shengliang Guan 已提交
25
    dDebug("node:%s, needs to be started", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
  int32_t shmsize = tsMnodeShmSize;
  if (pWrapper->ntype == VNODES) {
    shmsize = tsVnodeShmSize;
  } else if (pWrapper->ntype == QNODE) {
    shmsize = tsQnodeShmSize;
  } else if (pWrapper->ntype == SNODE) {
    shmsize = tsSnodeShmSize;
  } else if (pWrapper->ntype == MNODE) {
    shmsize = tsMnodeShmSize;
  } else if (pWrapper->ntype == BNODE) {
    shmsize = tsBnodeShmSize;
  } else {
    return -1;
  }

  if (taosCreateShm(&pWrapper->shm, pWrapper->ntype, shmsize) != 0) {
    terrno = TAOS_SYSTEM_ERROR(terrno);
    dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
    return -1;
  }
  dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);

  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = false;
  pWrapper->procType = PROC_PARENT;
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }

  return 0;
}

static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) {
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
  return 0;
}

static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) {
  if (pWrapper->pDnode->ntype == NODE_MAX) {
    dInfo("node:%s, should be started manually", pWrapper->name);
  } else {
    if (dndNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
      return -1;
    }
  }

  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }

  return 0;
}

static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
105 106 107 108 109 110 111
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
112
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
113
    return -1;
S
shm  
Shengliang Guan 已提交
114 115
  }

S
shm  
Shengliang Guan 已提交
116
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
117 118
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
119
}
S
shm  
Shengliang Guan 已提交
120

121 122 123 124 125 126 127 128 129 130 131 132 133
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
  SDnode *pDnode = pWrapper->pDnode;
  if (pDnode->procType == PROC_SINGLE) {
    return dndOpenNodeImp(pWrapper);
  } else if (pDnode->procType == PROC_PARENT) {
    if (dndInitNodeProc(pWrapper) != 0) return -1;
    if (dndWriteShmFile(pDnode) != 0) return -1;
    if (dndRunNodeProc(pWrapper) != 0) return -1;
  }
  return 0;
}

static void dndCloseNodeImp(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
134
  dDebug("node:%s, mgmt start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
135
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
136
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
137 138 139 140
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
141 142 143 144 145 146
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
147 148 149 150
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
151
  dDebug("node:%s, mgmt has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
152
}
S
shm  
Shengliang Guan 已提交
153

154 155 156 157 158 159 160 161 162
void dndCloseNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->pDnode->procType == PROC_PARENT) {
    if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
      dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
      taosKillProc(pWrapper->procId);
      dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
      taosWaitProc(pWrapper->procId);
      dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
    }
S
shm  
Shengliang Guan 已提交
163
  }
164
  dndCloseNodeImp(pWrapper);
S
shm  
Shengliang Guan 已提交
165 166
}

167
static void dndProcessProcHandle(void *handle) {
S
Shengliang Guan 已提交
168
  dWarn("handle:%p, the child process dies and send an offline rsp", handle);
169
  SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
170 171
  rpcSendResponse(&rpcMsg);
}
S
shm  
Shengliang Guan 已提交
172 173

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
174
  dInfo("dnode run in single process");
175
  pDnode->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
176

S
Shengliang Guan 已提交
177
  for (EDndType n = DNODE; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
178 179 180 181
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

182
    if (dndOpenNodeImp(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
183 184 185 186 187 188 189
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

S
Shengliang Guan 已提交
190
  for (EDndType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
205
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
206 207 208 209 210 211 212 213
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
214
static int32_t dndRunInParentProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
215
  dInfo("dnode run in parent process");
216 217
  pDnode->procType = PROC_PARENT;

S
shm  
Shengliang Guan 已提交
218
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
219
  if (dndOpenNodeImp(pDWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
220 221 222
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
225
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
226
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
227
    if (!pWrapper->required) continue;
228
    if (dndInitNodeProc(pWrapper) != 0) return -1;
S
shm  
Shengliang Guan 已提交
229 230
  }

S
shm  
Shengliang Guan 已提交
231
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
232 233 234 235
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
236
  for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
237 238
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
239
    if (dndRunNodeProc(pWrapper) != 0) return -1;
S
shm  
Shengliang Guan 已提交
240
  }
S
shm  
Shengliang Guan 已提交
241

S
shm  
Shengliang Guan 已提交
242
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
243

S
shm  
Shengliang Guan 已提交
244 245 246 247
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
248

S
shm  
Shengliang Guan 已提交
249 250 251 252 253 254
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
255 256
      dndSetStatus(pDnode, DND_STAT_STOPPED);

S
Shengliang Guan 已提交
257
      for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
258 259 260 261 262
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
S
shm  
Shengliang Guan 已提交
263 264
          dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
          taosKillProc(pWrapper->procId);
S
shm  
Shengliang Guan 已提交
265 266 267 268 269 270 271
          dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
          taosWaitProc(pWrapper->procId);
          dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
        }
      }
      break;
    } else {
S
Shengliang Guan 已提交
272
      for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
273 274 275 276 277
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
S
Shengliang Guan 已提交
278
          dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
279
          taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
280
          dndNewNodeProc(pWrapper, n);
S
shm  
Shengliang Guan 已提交
281
        }
S
shm  
Shengliang Guan 已提交
282 283
      }
    }
S
Shengliang Guan 已提交
284 285

    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
286 287
  }

S
shm  
Shengliang Guan 已提交
288 289
  return 0;
}
S
shm  
Shengliang Guan 已提交
290

S
shm  
Shengliang Guan 已提交
291 292
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
Shengliang Guan 已提交
293
  dInfo("%s run in child process", pWrapper->name);
294
  pDnode->procType = PROC_CHILD;
S
shm  
Shengliang Guan 已提交
295

S
Shengliang Guan 已提交
296 297 298 299 300 301
  pWrapper->required = dndRequireNode(pWrapper);
  if (!pWrapper->required) {
    dError("%s does not require startup", pWrapper->name);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
302 303 304 305
  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

306
  if (dndOpenNodeImp(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
307 308
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
309 310
  }

S
shm  
Shengliang Guan 已提交
311 312
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
313 314 315 316 317
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
318

S
shm  
Shengliang Guan 已提交
319 320 321 322 323 324 325
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
326 327
  dndSetStatus(pDnode, DND_STAT_RUNNING);

S
shm  
Shengliang Guan 已提交
328 329 330
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
331 332
  }

S
shm  
Shengliang Guan 已提交
333
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
334
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
335
  while (1) {
S
shm  
Shengliang Guan 已提交
336
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
337
      dInfo("%s is about to stop", pWrapper->name);
S
shm  
Shengliang Guan 已提交
338
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
339 340
      break;
    }
S
shm  
Shengliang Guan 已提交
341 342
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
343 344

  return 0;
S
shm  
Shengliang Guan 已提交
345 346 347 348 349 350 351 352 353 354
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
355 356

  return 0;
S
shm  
Shengliang Guan 已提交
357
}