dndExec.c 9.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool    required = false;
  int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
shm  
Shengliang Guan 已提交
23
    dDebug("node:%s, no need to start", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
shm  
Shengliang Guan 已提交
25
    dDebug("node:%s, need to start", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68 69
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
70 71
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
72
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
73 74

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
75
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
76 77

  if (code != 0) {
S
Shengliang Guan 已提交
78
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
79
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
90 91 92
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
93
  dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
94 95

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
96
    case PROC_REG:
S
Shengliang Guan 已提交
97 98
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
99 100 101 102
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
103 104 105
    case PROC_REQ:
      // todo send to dnode
      dndSendReqToMnode(pWrapper, pMsg);
S
Shengliang Guan 已提交
106 107 108 109 110
    default:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
111 112
}

S
shm  
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
  char  tstr[8] = {0};
  char *args[6] = {0};
  snprintf(tstr, sizeof(tstr), "%d", n);
  args[1] = "-c";
  args[2] = configDir;
  args[3] = "-n";
  args[4] = tstr;
  args[5] = NULL;

  int32_t pid = taosNewProc(args);
  if (pid <= 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
    return -1;
  }

  pWrapper->procId = pid;
  dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
  return 0;
}

static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                  .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                  .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                  .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};
  return cfg;
}

static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
153
  dInfo("dnode run in single process");
S
shm  
Shengliang Guan 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190

  for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    pWrapper->required = dndRequireNode(pWrapper);
    if (!pWrapper->required) continue;

    if (dndOpenNode(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
      break;
    }
    taosMsleep(100);
  }

  return 0;
}

S
shm  
Shengliang Guan 已提交
191
static int32_t dndRunInParentProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
192
  dInfo("dnode run in parent process");
S
shm  
Shengliang Guan 已提交
193 194 195 196 197
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
198

S
shm  
Shengliang Guan 已提交
199
  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
200
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
201
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
202 203
    if (!pWrapper->required) continue;

S
Shengliang Guan 已提交
204
    int32_t shmsize = 1024 * 1024 * 2;  // size will be a configuration item
S
shm  
Shengliang Guan 已提交
205 206
    if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
      terrno = TAOS_SYSTEM_ERROR(terrno);
S
Shengliang Guan 已提交
207
      dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
208 209
      return -1;
    }
S
Shengliang Guan 已提交
210
    dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
S
shm  
Shengliang Guan 已提交
211

S
shm  
Shengliang Guan 已提交
212 213
    SProcCfg cfg = dndGenProcCfg(pWrapper);
    cfg.isChild = false;
S
shm  
Shengliang Guan 已提交
214
    pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
215 216 217 218
    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
219
    }
S
shm  
Shengliang Guan 已提交
220 221
  }

S
shm  
Shengliang Guan 已提交
222
  if (dndWriteShmFile(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
223 224 225 226 227 228 229
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
230

S
shm  
Shengliang Guan 已提交
231
    if (pDnode->ntype == NODE_MAX) {
S
shm  
Shengliang Guan 已提交
232
      dInfo("node:%s, should be started manually", pWrapper->name);
S
shm  
Shengliang Guan 已提交
233
    } else {
S
shm  
Shengliang Guan 已提交
234
      if (dndNewProc(pWrapper, n) != 0) {
S
shm  
Shengliang Guan 已提交
235 236
        return -1;
      }
S
shm  
Shengliang Guan 已提交
237
    }
S
shm  
Shengliang Guan 已提交
238 239 240

    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
241 242
      return -1;
    }
S
shm  
Shengliang Guan 已提交
243
  }
S
shm  
Shengliang Guan 已提交
244

S
shm  
Shengliang Guan 已提交
245
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
246

S
shm  
Shengliang Guan 已提交
247 248 249 250
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
251

S
shm  
Shengliang Guan 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
  dInfo("TDengine initialized successfully");
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

  while (1) {
    if (pDnode->event == DND_EVENT_STOP) {
      dInfo("dnode is about to stop");
      break;
    }

    for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
      SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
      if (!pWrapper->required) continue;
      if (pDnode->ntype == NODE_MAX) continue;

      if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) {
        dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId);
        dndNewProc(pWrapper, n);
      }

      taosMsleep(100);
    }
  }

S
shm  
Shengliang Guan 已提交
275 276
  return 0;
}
S
shm  
Shengliang Guan 已提交
277

S
shm  
Shengliang Guan 已提交
278 279
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
S
Shengliang Guan 已提交
280
  dInfo("%s run in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
281 282 283 284 285

  SMsgCb msgCb = dndCreateMsgcb(pWrapper);
  tmsgSetDefaultMsgCb(&msgCb);
  pWrapper->procType = PROC_CHILD;

S
shm  
Shengliang Guan 已提交
286 287 288
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
289 290
  }

S
shm  
Shengliang Guan 已提交
291 292
  SProcCfg cfg = dndGenProcCfg(pWrapper);
  cfg.isChild = true;
S
shm  
Shengliang Guan 已提交
293 294 295 296 297
  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
298

S
shm  
Shengliang Guan 已提交
299 300 301 302 303 304 305
  if (pWrapper->fp.startFp != NULL) {
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
306 307 308
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
309 310
  }

S
shm  
Shengliang Guan 已提交
311
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
312
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
313
  while (1) {
S
shm  
Shengliang Guan 已提交
314
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
315
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
316 317
      break;
    }
S
shm  
Shengliang Guan 已提交
318 319
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
320 321 322 323 324 325 326 327 328 329
}

int32_t dndRun(SDnode *pDnode) {
  if (!tsMultiProcess) {
    return dndRunInSingleProcess(pDnode);
  } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
    return dndRunInParentProcess(pDnode);
  } else {
    return dndRunInChildProcess(pDnode);
  }
S
shm  
Shengliang Guan 已提交
330 331

  return 0;
S
shm  
Shengliang Guan 已提交
332
}