dndExec.c 11.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
Shengliang Guan 已提交
25
    dDebug("node:%s, needs to be started", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, mgmt start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, mgmt has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68 69
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
70 71
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
72
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
73 74

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
75
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
76 77

  if (code != 0) {
S
Shengliang Guan 已提交
78
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
79
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
80
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
S
Shengliang Guan 已提交
81
      tmsgSendRsp(&rsp);
S
shm  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
90 91 92
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
93
  dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
94 95

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
96
    case PROC_REGIST:
S
Shengliang Guan 已提交
97 98
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
99 100 101 102
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
103 104
    case PROC_REQ:
      dndSendReqToMnode(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
105 106 107
      // dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
      break;
    case PROC_RSP:
S
Shengliang Guan 已提交
108 109
      dndSendRpcRsp(pWrapper, pMsg);
      break;
S
shm  
Shengliang Guan 已提交
110 111
    default:
      break;
S
Shengliang Guan 已提交
112 113
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
114 115
}

S
shm  
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
S
Shengliang Guan 已提交
134
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
S
shm  
Shengliang Guan 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
  return 0;
}

static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
156
  dInfo("dnode run in single process");
S
shm  
Shengliang Guan 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185

  for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

    if (dndOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
186
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
187 188 189 190 191 192 193 194
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
195
static int32_t dndRunInParentProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
196
  dInfo("dnode run in parent process");
S
shm  
Shengliang Guan 已提交
197 198 199 200 201
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
202

S
shm  
Shengliang Guan 已提交
203
  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
204
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
205
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
206 207
    if (!pWrapper->required) continue;

S
Shengliang Guan 已提交
208
    int32_t shmsize = 1024 * 1024 * 2;  // size will be a configuration item
S
shm  
Shengliang Guan 已提交
209
    if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
S
shm  
Shengliang Guan 已提交
210
      terrno = TAOS_SYSTEM_ERROR(terrno);
S
Shengliang Guan 已提交
211
      dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
212 213
      return -1;
    }
S
Shengliang Guan 已提交
214
    dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
S
shm  
Shengliang Guan 已提交
215

S
shm  
Shengliang Guan 已提交
216 217
    SProcCfg cfg = dndGenProcCfg(pWrapper);
    cfg.isChild = false;
S
shm  
Shengliang Guan 已提交
218
    pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
219 220 221 222
    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
223
    }
S
shm  
Shengliang Guan 已提交
224 225
  }

S
shm  
Shengliang Guan 已提交
226
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
227 228 229 230 231 232 233
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
234

S
shm  
Shengliang Guan 已提交
235
    if (pDnode->ntype == NODE_MAX) {
S
shm  
Shengliang Guan 已提交
236
      dInfo("node:%s, should be started manually", pWrapper->name);
S
shm  
Shengliang Guan 已提交
237
    } else {
S
shm  
Shengliang Guan 已提交
238
      if (dndNewProc(pWrapper, n) != 0) {
S
shm  
Shengliang Guan 已提交
239 240
        return -1;
      }
S
shm  
Shengliang Guan 已提交
241
    }
S
shm  
Shengliang Guan 已提交
242 243 244

    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
245 246
      return -1;
    }
S
shm  
Shengliang Guan 已提交
247
  }
S
shm  
Shengliang Guan 已提交
248

S
shm  
Shengliang Guan 已提交
249
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
250

S
shm  
Shengliang Guan 已提交
251 252 253 254
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
255

S
shm  
Shengliang Guan 已提交
256 257 258 259 260 261
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
262 263
      dndSetStatus(pDnode, DND_STAT_STOPPED);

S
shm  
Shengliang Guan 已提交
264 265 266 267 268 269
      for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
S
shm  
Shengliang Guan 已提交
270 271
          dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
          taosKillProc(pWrapper->procId);
S
shm  
Shengliang Guan 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
          dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
          taosWaitProc(pWrapper->procId);
          dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
        }
      }
      break;
    } else {
      for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
          dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
          dndNewProc(pWrapper, n);
        }
S
shm  
Shengliang Guan 已提交
288 289
      }
    }
S
Shengliang Guan 已提交
290 291

    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
292 293
  }

S
shm  
Shengliang Guan 已提交
294 295
  return 0;
}
S
shm  
Shengliang Guan 已提交
296

S
shm  
Shengliang Guan 已提交
297 298
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
Shengliang Guan 已提交
299
  dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301 302 303 304 305 306
  pWrapper->required = dndRequireNode(pWrapper);
  if (!pWrapper->required) {
    dError("%s does not require startup", pWrapper->name);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
307 308 309 310
  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

S
shm  
Shengliang Guan 已提交
311 312 313
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
314 315
  }

S
shm  
Shengliang Guan 已提交
316 317
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
318 319 320 321 322
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
323

S
shm  
Shengliang Guan 已提交
324 325 326 327 328 329 330
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
331 332
  dndSetStatus(pDnode, DND_STAT_RUNNING);

S
shm  
Shengliang Guan 已提交
333 334 335
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
336 337
  }

S
shm  
Shengliang Guan 已提交
338
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
339
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
340
  while (1) {
S
shm  
Shengliang Guan 已提交
341
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
342
      dInfo("%s is about to stop", pWrapper->name);
S
shm  
Shengliang Guan 已提交
343
      dndSetStatus(pDnode, DND_STAT_STOPPED);
S
shm  
Shengliang Guan 已提交
344 345
      break;
    }
S
shm  
Shengliang Guan 已提交
346 347
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
348 349

  return 0;
S
shm  
Shengliang Guan 已提交
350 351 352 353 354 355 356 357 358 359
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
360 361

  return 0;
S
shm  
Shengliang Guan 已提交
362
}