dmExec.c 10.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  }
S
Shengliang Guan 已提交
25 26
  return required;
}
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
29
  int32_t shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
30
  if (pWrapper->ntype == VNODE) {
31 32 33 34 35 36 37 38 39 40 41 42 43
    shmsize = tsVnodeShmSize;
  } else if (pWrapper->ntype == QNODE) {
    shmsize = tsQnodeShmSize;
  } else if (pWrapper->ntype == SNODE) {
    shmsize = tsSnodeShmSize;
  } else if (pWrapper->ntype == MNODE) {
    shmsize = tsMnodeShmSize;
  } else if (pWrapper->ntype == BNODE) {
    shmsize = tsBnodeShmSize;
  } else {
    return -1;
  }

S
Shengliang Guan 已提交
44
  if (taosCreateShm(&pWrapper->procShm, pWrapper->ntype, shmsize) != 0) {
45 46 47 48
    terrno = TAOS_SYSTEM_ERROR(terrno);
    dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
49
  dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->procShm.id, shmsize);
50

S
Shengliang Guan 已提交
51
  SProcCfg cfg = dmGenProcCfg(pWrapper);
52
  cfg.isChild = false;
S
Shengliang Guan 已提交
53 54 55
  pWrapper->procType = DND_PROC_PARENT;
  pWrapper->procObj = taosProcInit(&cfg);
  if (pWrapper->procObj == NULL) {
56 57 58 59 60 61 62
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
63
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) {
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
  return 0;
}

S
Shengliang Guan 已提交
85
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
86
  if (pWrapper->pDnode->ntype == NODE_END) {
S
Shengliang Guan 已提交
87
    dInfo("node:%s, should be started manually in child process", pWrapper->name);
88
  } else {
S
Shengliang Guan 已提交
89
    if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
90 91 92
      return -1;
    }
  }
S
Shengliang Guan 已提交
93
  if (taosProcRun(pWrapper->procObj) != 0) {
94 95 96
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109
  return 0;
}

static int32_t dmInitChildProc(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = dmGenProcCfg(pWrapper);
  cfg.isChild = true;
  pWrapper->procObj = taosProcInit(&cfg);
  if (pWrapper->procObj == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
  return 0;
}
110

S
Shengliang Guan 已提交
111 112 113 114 115
static int32_t dmRunChildProc(SMgmtWrapper *pWrapper) {
  if (taosProcRun(pWrapper->procObj) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }
116 117 118
  return 0;
}

S
Shengliang Guan 已提交
119
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
120 121 122 123 124 125
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
126 127 128 129 130
  if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) {
    if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
131 132 133 134
    if (pWrapper->procType == DND_PROC_CHILD) {
      if (dmInitChildProc(pWrapper) != 0) return -1;
      if (dmRunChildProc(pWrapper) != 0) return -1;
    }
S
Shengliang Guan 已提交
135 136 137
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
  } else {
S
Shengliang Guan 已提交
138
    if (dmInitParentProc(pWrapper) != 0) return -1;
S
Shengliang Guan 已提交
139
    if (dmWriteShmFile(pWrapper) != 0) return -1;
S
Shengliang Guan 已提交
140
    if (dmRunParentProc(pWrapper) != 0) return -1;
S
shm  
Shengliang Guan 已提交
141 142
  }

S
shm  
Shengliang Guan 已提交
143
  return 0;
S
shm  
Shengliang Guan 已提交
144
}
S
shm  
Shengliang Guan 已提交
145

S
Shengliang Guan 已提交
146 147 148 149 150
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->procType == DND_PROC_PARENT) {
    dInfo("node:%s, not start in parent process", pWrapper->name);
  } else if (pWrapper->procType == DND_PROC_CHILD) {
    dInfo("node:%s, start in child process", pWrapper->name);
151 152 153 154 155 156
    if (pWrapper->ntype != DNODE) {
      if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
    }
S
Shengliang Guan 已提交
157 158 159 160 161
  } else {
    if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
162
  }
S
Shengliang Guan 已提交
163

164 165 166
  return 0;
}

S
Shengliang Guan 已提交
167
void dmStopNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
168 169 170
  if (pWrapper->fp.stopFp != NULL) {
    (*pWrapper->fp.stopFp)(pWrapper);
  }
S
Shengliang Guan 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
}

void dmCloseNode(SMgmtWrapper *pWrapper) {
  dInfo("node:%s, start to close", pWrapper->name);
  if (pWrapper->procType == DND_PROC_PARENT) {
    if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
      dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
      taosKillProc(pWrapper->procId);
      dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
      taosWaitProc(pWrapper->procId);
      dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
    }
  }

  dmStopNode(pWrapper);
S
Shengliang Guan 已提交
186

S
shm  
Shengliang Guan 已提交
187
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
188
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
189 190 191 192
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
193 194 195 196 197 198
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
Shengliang Guan 已提交
199 200 201
  if (pWrapper->procObj) {
    taosProcCleanup(pWrapper->procObj);
    pWrapper->procObj = NULL;
S
shm  
Shengliang Guan 已提交
202
  }
S
shm  
Shengliang Guan 已提交
203

S
Shengliang Guan 已提交
204
  dInfo("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
205 206
}

S
Shengliang Guan 已提交
207 208 209 210 211 212 213
static int32_t dmOpenNodes(SDnode *pDnode) {
  if (pDnode->ptype == DND_PROC_CHILD) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
    pWrapper->required = dmRequireNode(pWrapper);
    if (!pWrapper->required) {
      dError("dnode:%s, failed to open since not required", pWrapper->name);
    }
S
shm  
Shengliang Guan 已提交
214

S
Shengliang Guan 已提交
215
    pWrapper->procType = DND_PROC_CHILD;
S
shm  
Shengliang Guan 已提交
216

S
Shengliang Guan 已提交
217 218 219
    SMsgCb msgCb = pDnode->data.msgCb;
    msgCb.pWrapper = pWrapper;
    tmsgSetDefaultMsgCb(&msgCb);
S
shm  
Shengliang Guan 已提交
220

S
Shengliang Guan 已提交
221 222
    if (dmOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
223 224
      return -1;
    }
S
Shengliang Guan 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
  } else {
    for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
      SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
      pWrapper->required = dmRequireNode(pWrapper);
      if (!pWrapper->required) continue;

      if (pDnode->ptype == DND_PROC_PARENT && n != DNODE) {
        pWrapper->procType = DND_PROC_PARENT;
      } else {
        pWrapper->procType = DND_PROC_SINGLE;
      }

      if (dmOpenNode(pWrapper) != 0) {
        dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
        return -1;
      }
    }
S
shm  
Shengliang Guan 已提交
242 243
  }

S
Shengliang Guan 已提交
244
  dmSetStatus(pDnode, DND_STAT_RUNNING);
S
Shengliang Guan 已提交
245 246
  return 0;
}
S
shm  
Shengliang Guan 已提交
247

S
Shengliang Guan 已提交
248
static int32_t dmStartNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
249
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
250 251
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
252
    if (dmStartNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
253 254 255 256 257 258
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
S
Shengliang Guan 已提交
259
  dmReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
260 261 262
  return 0;
}

S
Shengliang Guan 已提交
263 264
static void dmStopNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
265
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
266
    dmStopNode(pWrapper);
S
shm  
Shengliang Guan 已提交
267
  }
S
Shengliang Guan 已提交
268
}
S
shm  
Shengliang Guan 已提交
269

S
Shengliang Guan 已提交
270 271
static void dmCloseNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
272
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
273
    dmCloseNode(pWrapper);
S
shm  
Shengliang Guan 已提交
274
  }
S
Shengliang Guan 已提交
275
}
S
shm  
Shengliang Guan 已提交
276

S
Shengliang Guan 已提交
277 278 279 280 281
static void dmProcessProcHandle(void *handle) {
  dWarn("handle:%p, the child process dies and send an offline rsp", handle);
  SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
  rpcSendResponse(&rpcMsg);
}
S
shm  
Shengliang Guan 已提交
282

S
Shengliang Guan 已提交
283
static void dmWatchNodes(SDnode *pDnode) {
284
  taosThreadMutexLock(&pDnode->mutex);
S
Shengliang Guan 已提交
285 286 287 288
  if (pDnode->ptype == DND_PROC_PARENT) {
    for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
      SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
      if (!pWrapper->required) continue;
289
      if (pWrapper->procType != DND_PROC_PARENT) continue;
S
Shengliang Guan 已提交
290 291 292 293
      if (pDnode->ntype == NODE_END) continue;

      if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
        dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
294 295 296
        if (pWrapper->procObj) {
          taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
        }
S
Shengliang Guan 已提交
297
        dmNewNodeProc(pWrapper, n);
S
shm  
Shengliang Guan 已提交
298 299 300
      }
    }
  }
301
  taosThreadMutexUnlock(&pDnode->mutex);
S
shm  
Shengliang Guan 已提交
302
}
S
shm  
Shengliang Guan 已提交
303

S
Shengliang Guan 已提交
304 305 306 307 308 309 310 311 312 313 314
int32_t dmRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    pDnode->ptype = DND_PROC_SINGLE;
    dInfo("dnode run in single process");
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
    pDnode->ptype = DND_PROC_PARENT;
    dInfo("dnode run in parent process");
  } else {
    pDnode->ptype = DND_PROC_CHILD;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
    dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
315 316
  }

S
Shengliang Guan 已提交
317 318
  if (dmOpenNodes(pDnode) != 0) {
    dError("failed to open nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
319 320
    return -1;
  }
S
shm  
Shengliang Guan 已提交
321

S
Shengliang Guan 已提交
322 323
  if (dmStartNodes(pDnode) != 0) {
    dError("failed to start nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
324
    return -1;
S
shm  
Shengliang Guan 已提交
325 326
  }

S
shm  
Shengliang Guan 已提交
327
  while (1) {
S
Shengliang Guan 已提交
328 329 330
    taosMsleep(100);
    if (pDnode->event & DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
Shengliang Guan 已提交
331
      dmSetStatus(pDnode, DND_STAT_STOPPED);
S
Shengliang Guan 已提交
332 333 334 335 336
      dmStopNodes(pDnode);
      dmCloseNodes(pDnode);
      return 0;
    } else {
      dmWatchNodes(pDnode);
S
shm  
Shengliang Guan 已提交
337
    }
S
shm  
Shengliang Guan 已提交
338
  }
S
shm  
Shengliang Guan 已提交
339
}