dmExec.c 10.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  }
S
Shengliang Guan 已提交
25 26
  return required;
}
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
29
  int32_t shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
30
  if (pWrapper->nodeType == VNODE) {
31
    shmsize = tsVnodeShmSize;
S
Shengliang Guan 已提交
32
  } else if (pWrapper->nodeType == QNODE) {
33
    shmsize = tsQnodeShmSize;
S
Shengliang Guan 已提交
34
  } else if (pWrapper->nodeType == SNODE) {
35
    shmsize = tsSnodeShmSize;
S
Shengliang Guan 已提交
36
  } else if (pWrapper->nodeType == MNODE) {
37
    shmsize = tsMnodeShmSize;
S
Shengliang Guan 已提交
38
  } else if (pWrapper->nodeType == BNODE) {
39 40 41 42 43
    shmsize = tsBnodeShmSize;
  } else {
    return -1;
  }

S
Shengliang Guan 已提交
44
  if (taosCreateShm(&pWrapper->procShm, pWrapper->nodeType, shmsize) != 0) {
45 46 47 48
    terrno = TAOS_SYSTEM_ERROR(terrno);
    dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
49
  dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->procShm.id, shmsize);
50

S
Shengliang Guan 已提交
51
  SProcCfg cfg = dmGenProcCfg(pWrapper);
52
  cfg.isChild = false;
S
Shengliang Guan 已提交
53 54 55
  pWrapper->procType = DND_PROC_PARENT;
  pWrapper->procObj = taosProcInit(&cfg);
  if (pWrapper->procObj == NULL) {
56 57 58 59 60 61 62
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
63
static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) {
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
  return 0;
}

S
Shengliang Guan 已提交
85
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
86
  if (pWrapper->pDnode->ntype == NODE_END) {
S
Shengliang Guan 已提交
87
    dInfo("node:%s, should be started manually in child process", pWrapper->name);
88
  } else {
S
Shengliang Guan 已提交
89
    if (dmNewNodeProc(pWrapper, pWrapper->nodeType) != 0) {
90 91 92
      return -1;
    }
  }
S
Shengliang Guan 已提交
93
  if (taosProcRun(pWrapper->procObj) != 0) {
94 95 96
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109
  return 0;
}

static int32_t dmInitChildProc(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = dmGenProcCfg(pWrapper);
  cfg.isChild = true;
  pWrapper->procObj = taosProcInit(&cfg);
  if (pWrapper->procObj == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
  return 0;
}
110

S
Shengliang Guan 已提交
111 112 113 114 115
static int32_t dmRunChildProc(SMgmtWrapper *pWrapper) {
  if (taosProcRun(pWrapper->procObj) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
  }
116 117 118
  return 0;
}

S
Shengliang Guan 已提交
119
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
120 121 122 123 124 125
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
126 127 128 129 130
  if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) {
    if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
131 132 133 134
    if (pWrapper->procType == DND_PROC_CHILD) {
      if (dmInitChildProc(pWrapper) != 0) return -1;
      if (dmRunChildProc(pWrapper) != 0) return -1;
    }
S
Shengliang Guan 已提交
135 136 137
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
  } else {
S
Shengliang Guan 已提交
138
    if (dmInitParentProc(pWrapper) != 0) return -1;
S
Shengliang Guan 已提交
139
    if (dmWriteShmFile(pWrapper) != 0) return -1;
S
Shengliang Guan 已提交
140
    if (dmRunParentProc(pWrapper) != 0) return -1;
S
shm  
Shengliang Guan 已提交
141 142
  }

S
Shengliang Guan 已提交
143
  dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned");
S
shm  
Shengliang Guan 已提交
144
  return 0;
S
shm  
Shengliang Guan 已提交
145
}
S
shm  
Shengliang Guan 已提交
146

S
Shengliang Guan 已提交
147 148 149 150 151
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->procType == DND_PROC_PARENT) {
    dInfo("node:%s, not start in parent process", pWrapper->name);
  } else if (pWrapper->procType == DND_PROC_CHILD) {
    dInfo("node:%s, start in child process", pWrapper->name);
S
Shengliang Guan 已提交
152
    if (pWrapper->nodeType != DNODE) {
153 154 155 156 157
      if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
    }
S
Shengliang Guan 已提交
158 159 160 161 162
  } else {
    if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
163
  }
S
Shengliang Guan 已提交
164

S
Shengliang Guan 已提交
165
  dmReportStartup(pWrapper->pDnode, pWrapper->name, "started");
166 167 168
  return 0;
}

S
Shengliang Guan 已提交
169
void dmStopNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
170 171 172
  if (pWrapper->fp.stopFp != NULL) {
    (*pWrapper->fp.stopFp)(pWrapper);
  }
S
Shengliang Guan 已提交
173 174 175 176
}

void dmCloseNode(SMgmtWrapper *pWrapper) {
  dInfo("node:%s, start to close", pWrapper->name);
S
Shengliang Guan 已提交
177
  pWrapper->deployed = false;
178 179 180 181 182

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
Shengliang Guan 已提交
183 184 185 186 187 188 189 190 191 192 193
  if (pWrapper->procType == DND_PROC_PARENT) {
    if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
      dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
      taosKillProc(pWrapper->procId);
      dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
      taosWaitProc(pWrapper->procId);
      dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
    }
  }

  dmStopNode(pWrapper);
S
Shengliang Guan 已提交
194

S
shm  
Shengliang Guan 已提交
195
  taosWLockLatch(&pWrapper->latch);
196
  (*pWrapper->fp.closeFp)(pWrapper);
S
Shengliang Guan 已提交
197 198
  taosWUnLockLatch(&pWrapper->latch);

S
Shengliang Guan 已提交
199 200 201
  if (pWrapper->procObj) {
    taosProcCleanup(pWrapper->procObj);
    pWrapper->procObj = NULL;
S
shm  
Shengliang Guan 已提交
202
  }
S
shm  
Shengliang Guan 已提交
203

S
Shengliang Guan 已提交
204
  dInfo("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
205 206
}

S
Shengliang Guan 已提交
207 208 209 210 211 212 213
static int32_t dmOpenNodes(SDnode *pDnode) {
  if (pDnode->ptype == DND_PROC_CHILD) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
    pWrapper->required = dmRequireNode(pWrapper);
    if (!pWrapper->required) {
      dError("dnode:%s, failed to open since not required", pWrapper->name);
    }
S
shm  
Shengliang Guan 已提交
214

S
Shengliang Guan 已提交
215
    pWrapper->procType = DND_PROC_CHILD;
216 217 218
    if (dmInitClient(pDnode) != 0) {
      return -1;
    }
S
shm  
Shengliang Guan 已提交
219

220 221
    pDnode->data.msgCb = dmGetMsgcb(pWrapper);
    tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
S
shm  
Shengliang Guan 已提交
222

S
Shengliang Guan 已提交
223 224
    if (dmOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
225 226
      return -1;
    }
S
Shengliang Guan 已提交
227 228 229 230 231 232 233 234 235 236 237 238
  } else {
    for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
      SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
      pWrapper->required = dmRequireNode(pWrapper);
      if (!pWrapper->required) continue;

      if (pDnode->ptype == DND_PROC_PARENT && n != DNODE) {
        pWrapper->procType = DND_PROC_PARENT;
      } else {
        pWrapper->procType = DND_PROC_SINGLE;
      }

239 240 241 242 243 244 245 246 247
      if (n == DNODE) {
        if (dmInitClient(pDnode) != 0) {
          return -1;
        }

        pDnode->data.msgCb = dmGetMsgcb(pWrapper);
        tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
      }

S
Shengliang Guan 已提交
248 249 250 251 252
      if (dmOpenNode(pWrapper) != 0) {
        dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
        return -1;
      }
    }
S
shm  
Shengliang Guan 已提交
253 254
  }

S
Shengliang Guan 已提交
255
  dmSetStatus(pDnode, DND_STAT_RUNNING);
S
Shengliang Guan 已提交
256 257
  return 0;
}
S
shm  
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259
static int32_t dmStartNodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
260
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
261 262
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
Shengliang Guan 已提交
263
    if (dmStartNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
264 265 266 267 268 269
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
S
Shengliang Guan 已提交
270
  dmReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
271 272 273
  return 0;
}

S
Shengliang Guan 已提交
274 275
static void dmStopNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
276
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
277
    dmStopNode(pWrapper);
S
shm  
Shengliang Guan 已提交
278
  }
S
Shengliang Guan 已提交
279
}
S
shm  
Shengliang Guan 已提交
280

S
Shengliang Guan 已提交
281 282
static void dmCloseNodes(SDnode *pDnode) {
  for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
283
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
284
    dmCloseNode(pWrapper);
S
shm  
Shengliang Guan 已提交
285
  }
S
Shengliang Guan 已提交
286
}
S
shm  
Shengliang Guan 已提交
287

S
Shengliang Guan 已提交
288 289 290 291 292
static void dmProcessProcHandle(void *handle) {
  dWarn("handle:%p, the child process dies and send an offline rsp", handle);
  SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
  rpcSendResponse(&rpcMsg);
}
S
shm  
Shengliang Guan 已提交
293

S
Shengliang Guan 已提交
294
static void dmWatchNodes(SDnode *pDnode) {
295 296 297
  if (pDnode->ptype != DND_PROC_PARENT) return;
  if (pDnode->ntype == NODE_END) return;

298
  taosThreadMutexLock(&pDnode->mutex);
299 300 301 302
  for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->procType != DND_PROC_PARENT) continue;
S
Shengliang Guan 已提交
303

304 305 306 307
    if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
      dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
      if (pWrapper->procObj) {
        taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
S
shm  
Shengliang Guan 已提交
308
      }
309
      dmNewNodeProc(pWrapper, n);
S
shm  
Shengliang Guan 已提交
310 311
    }
  }
312
  taosThreadMutexUnlock(&pDnode->mutex);
S
shm  
Shengliang Guan 已提交
313
}
S
shm  
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315 316 317 318 319 320 321 322 323 324 325
int32_t dmRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    pDnode->ptype = DND_PROC_SINGLE;
    dInfo("dnode run in single process");
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
    pDnode->ptype = DND_PROC_PARENT;
    dInfo("dnode run in parent process");
  } else {
    pDnode->ptype = DND_PROC_CHILD;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
    dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
326 327
  }

S
Shengliang Guan 已提交
328 329
  if (dmOpenNodes(pDnode) != 0) {
    dError("failed to open nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
330 331
    return -1;
  }
S
shm  
Shengliang Guan 已提交
332

S
Shengliang Guan 已提交
333 334
  if (dmStartNodes(pDnode) != 0) {
    dError("failed to start nodes since %s", terrstr());
S
shm  
Shengliang Guan 已提交
335
    return -1;
S
shm  
Shengliang Guan 已提交
336 337
  }

S
shm  
Shengliang Guan 已提交
338
  while (1) {
S
Shengliang Guan 已提交
339 340 341
    taosMsleep(100);
    if (pDnode->event & DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
Shengliang Guan 已提交
342
      dmSetStatus(pDnode, DND_STAT_STOPPED);
S
Shengliang Guan 已提交
343 344 345 346 347
      dmStopNodes(pDnode);
      dmCloseNodes(pDnode);
      return 0;
    } else {
      dmWatchNodes(pDnode);
S
shm  
Shengliang Guan 已提交
348
    }
S
shm  
Shengliang Guan 已提交
349
  }
S
shm  
Shengliang Guan 已提交
350
}