dndExec.c 10.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
Shengliang Guan 已提交
23
    dDebug("node:%s, does not require startup", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
Shengliang Guan 已提交
25
    dDebug("node:%s, needs to be started", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68 69
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
70 71
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
72
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
73 74

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
75
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
76 77

  if (code != 0) {
S
Shengliang Guan 已提交
78
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
79
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
90 91 92
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
93
  dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
94 95

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
96
    case PROC_REG:
S
Shengliang Guan 已提交
97 98
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
99 100 101 102
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
103 104 105
    case PROC_REQ:
      // todo send to dnode
      dndSendReqToMnode(pWrapper, pMsg);
S
Shengliang Guan 已提交
106 107 108 109 110
    default:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
111 112
}

S
shm  
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
S
Shengliang Guan 已提交
131
  dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
S
shm  
Shengliang Guan 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  return 0;
}

static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
153
  dInfo("dnode run in single process");
S
shm  
Shengliang Guan 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

  for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

    if (dndOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
191
static int32_t dndRunInParentProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
192
  dInfo("dnode run in parent process");
S
shm  
Shengliang Guan 已提交
193 194 195 196 197
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
198

S
shm  
Shengliang Guan 已提交
199
  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
200
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
201
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
202 203
    if (!pWrapper->required) continue;

S
Shengliang Guan 已提交
204
    int32_t shmsize = 1024 * 1024 * 2;  // size will be a configuration item
S
shm  
Shengliang Guan 已提交
205
    if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
S
shm  
Shengliang Guan 已提交
206
      terrno = TAOS_SYSTEM_ERROR(terrno);
S
Shengliang Guan 已提交
207
      dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
208 209
      return -1;
    }
S
Shengliang Guan 已提交
210
    dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
S
shm  
Shengliang Guan 已提交
211

S
shm  
Shengliang Guan 已提交
212 213
    SProcCfg cfg = dndGenProcCfg(pWrapper);
    cfg.isChild = false;
S
shm  
Shengliang Guan 已提交
214
    pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
215 216 217 218
    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
219
    }
S
shm  
Shengliang Guan 已提交
220 221
  }

S
shm  
Shengliang Guan 已提交
222
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
223 224 225 226 227 228 229
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
230

S
shm  
Shengliang Guan 已提交
231
    if (pDnode->ntype == NODE_MAX) {
S
shm  
Shengliang Guan 已提交
232
      dInfo("node:%s, should be started manually", pWrapper->name);
S
shm  
Shengliang Guan 已提交
233
    } else {
S
shm  
Shengliang Guan 已提交
234
      if (dndNewProc(pWrapper, n) != 0) {
S
shm  
Shengliang Guan 已提交
235 236
        return -1;
      }
S
shm  
Shengliang Guan 已提交
237
    }
S
shm  
Shengliang Guan 已提交
238 239 240

    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
241 242
      return -1;
    }
S
shm  
Shengliang Guan 已提交
243
  }
S
shm  
Shengliang Guan 已提交
244

S
shm  
Shengliang Guan 已提交
245
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
246

S
shm  
Shengliang Guan 已提交
247 248 249 250
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
251

S
shm  
Shengliang Guan 已提交
252 253 254 255 256 257
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
258 259 260 261 262 263
      for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
S
shm  
Shengliang Guan 已提交
264 265
          // dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
          // taosKillProc(pWrapper->procId);
S
shm  
Shengliang Guan 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
          dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
          taosWaitProc(pWrapper->procId);
          dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
        }
      }
      break;
    } else {
      for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
        SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
        if (!pWrapper->required) continue;
        if (pDnode->ntype == NODE_MAX) continue;

        if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
          dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
          dndNewProc(pWrapper, n);
        }
S
shm  
Shengliang Guan 已提交
282 283
      }
    }
S
Shengliang Guan 已提交
284 285

    taosMsleep(100);
S
shm  
Shengliang Guan 已提交
286 287
  }

S
shm  
Shengliang Guan 已提交
288 289
  return 0;
}
S
shm  
Shengliang Guan 已提交
290

S
shm  
Shengliang Guan 已提交
291 292
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
Shengliang Guan 已提交
293
  dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
294

S
Shengliang Guan 已提交
295 296 297 298 299 300
  pWrapper->required = dndRequireNode(pWrapper);
  if (!pWrapper->required) {
    dError("%s does not require startup", pWrapper->name);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
301 302 303 304
  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

S
shm  
Shengliang Guan 已提交
305 306 307
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
308 309
  }

S
shm  
Shengliang Guan 已提交
310 311
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
312 313 314 315 316
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
317

S
shm  
Shengliang Guan 已提交
318 319 320 321 322 323 324
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
325 326 327
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
328 329
  }

S
shm  
Shengliang Guan 已提交
330
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
331
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
332
  while (1) {
S
shm  
Shengliang Guan 已提交
333
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
334
      dInfo("%s is about to stop", pWrapper->name);
S
shm  
Shengliang Guan 已提交
335 336
      break;
    }
S
shm  
Shengliang Guan 已提交
337 338
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
339 340 341 342 343 344 345 346 347 348
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
349 350

  return 0;
S
shm  
Shengliang Guan 已提交
351
}