dndExec.c 9.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
shm  
Shengliang Guan 已提交
23
    dDebug("node:%s, no need to start", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
shm  
Shengliang Guan 已提交
25
    dDebug("node:%s, need to start", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68 69
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
70 71
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
72
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
73 74

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
75
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
76 77

  if (code != 0) {
S
Shengliang Guan 已提交
78
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
79
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
90 91 92
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
93
  dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
94 95

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
96
    case PROC_REG:
S
Shengliang Guan 已提交
97 98
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
99 100 101 102
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
103 104 105
    case PROC_REQ:
      // todo send to dnode
      dndSendReqToMnode(pWrapper, pMsg);
S
Shengliang Guan 已提交
106 107 108 109 110
    default:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
111 112
}

S
shm  
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
  dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
  return 0;
}

static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
  dInfo("dnode start to run in single process");

  for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

    if (dndOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
191 192 193 194 195 196 197
static int32_t dndRunInParentProcess(SDnode *pDnode) {
  dInfo("dnode start to run in parent process");
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
198

S
shm  
Shengliang Guan 已提交
199
  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
200
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
201
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
202 203
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
204 205 206 207
    int64_t shmsize = 1024 * 1024 * 2;  // size will be a configuration item
    if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
      terrno = TAOS_SYSTEM_ERROR(terrno);
      dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
208 209 210
      return -1;
    }

S
shm  
Shengliang Guan 已提交
211 212
    SProcCfg cfg = dndGenProcCfg(pWrapper);
    cfg.isChild = false;
S
shm  
Shengliang Guan 已提交
213
    pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
214 215 216 217
    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
218
    }
S
shm  
Shengliang Guan 已提交
219 220
  }

S
shm  
Shengliang Guan 已提交
221
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
222 223 224 225 226 227 228
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
229

S
shm  
Shengliang Guan 已提交
230
    if (pDnode->ntype == NODE_MAX) {
S
shm  
Shengliang Guan 已提交
231
      dInfo("node:%s, should be started manually", pWrapper->name);
S
shm  
Shengliang Guan 已提交
232
    } else {
S
shm  
Shengliang Guan 已提交
233
      if (dndNewProc(pWrapper, n) != 0) {
S
shm  
Shengliang Guan 已提交
234 235
        return -1;
      }
S
shm  
Shengliang Guan 已提交
236
    }
S
shm  
Shengliang Guan 已提交
237 238 239

    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
240 241
      return -1;
    }
S
shm  
Shengliang Guan 已提交
242
  }
S
shm  
Shengliang Guan 已提交
243

S
shm  
Shengliang Guan 已提交
244
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
245

S
shm  
Shengliang Guan 已提交
246 247 248 249
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
250

S
shm  
Shengliang Guan 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
      break;
    }

    for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
      SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
      if (!pWrapper->required) continue;
      if (pDnode->ntype == NODE_MAX) continue;

      if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) {
        dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId);
        dndNewProc(pWrapper, n);
      }

      taosMsleep(100);
    }
  }

S
shm  
Shengliang Guan 已提交
274 275
  return 0;
}
S
shm  
Shengliang Guan 已提交
276

S
shm  
Shengliang Guan 已提交
277 278 279
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  dInfo("dnode start to run in child process");
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
shm  
Shengliang Guan 已提交
280 281 282 283 284

  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

S
shm  
Shengliang Guan 已提交
285 286 287
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
288 289
  }

S
shm  
Shengliang Guan 已提交
290 291
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
292 293 294 295 296
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
297

S
shm  
Shengliang Guan 已提交
298 299 300 301 302 303 304
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
305 306 307
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
308 309
  }

S
shm  
Shengliang Guan 已提交
310
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
311
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
312
  while (1) {
S
shm  
Shengliang Guan 已提交
313
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
314
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
315 316
      break;
    }
S
shm  
Shengliang Guan 已提交
317 318
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
319 320 321 322 323 324 325 326 327 328
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
329 330

  return 0;
S
shm  
Shengliang Guan 已提交
331
}