dndExec.c 8.4 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
20 21
  bool required = false;
  int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
22
  if (!required) {
S
shm  
Shengliang Guan 已提交
23
    dDebug("node:%s, no need to start", pWrapper->name);
S
Shengliang Guan 已提交
24
  } else {
S
shm  
Shengliang Guan 已提交
25
    dDebug("node:%s, need to start", pWrapper->name);
S
Shengliang Guan 已提交
26
  }
S
Shengliang Guan 已提交
27 28
  return required;
}
S
Shengliang Guan 已提交
29

S
shm  
Shengliang Guan 已提交
30
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
31 32 33 34 35 36 37
  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
    return -1;
  }

  if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
38
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
39
    return -1;
S
shm  
Shengliang Guan 已提交
40 41
  }

S
shm  
Shengliang Guan 已提交
42
  dDebug("node:%s, has been opened", pWrapper->name);
S
shm  
Shengliang Guan 已提交
43 44
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
45
}
S
shm  
Shengliang Guan 已提交
46

S
shm  
Shengliang Guan 已提交
47
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
48
  dDebug("node:%s, start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
49
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
50
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
51 52 53 54
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
55 56 57 58 59 60
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
61 62 63 64
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
65
  dDebug("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
66
}
S
shm  
Shengliang Guan 已提交
67

S
shm  
Shengliang Guan 已提交
68
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
69
  dInfo("dnode start to run in single process");
S
shm  
Shengliang Guan 已提交
70

S
shm  
Shengliang Guan 已提交
71
  for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
72
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
73
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
74 75
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
76
    if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
77 78 79 80
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }
S
shm  
Shengliang Guan 已提交
81

S
shm  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89 90 91
  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
92 93
  }

S
shm  
Shengliang Guan 已提交
94
  dInfo("dnode running in single process");
S
shm  
Shengliang Guan 已提交
95
  return 0;
S
shm  
Shengliang Guan 已提交
96 97
}

S
Shengliang Guan 已提交
98 99
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
100 101
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
102
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
103 104

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
105
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
106 107

  if (code != 0) {
S
Shengliang Guan 已提交
108
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
109
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118 119
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
120 121 122
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
123
  dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
124 125

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
126
    case PROC_REG:
S
Shengliang Guan 已提交
127 128
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
129 130 131 132
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
133 134 135
    case PROC_REQ:
      // todo send to dnode
      dndSendReqToMnode(pWrapper, pMsg);
S
Shengliang Guan 已提交
136 137 138 139 140
    default:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
141 142
}

S
shm  
Shengliang Guan 已提交
143 144 145 146 147 148 149
static int32_t dndRunInParentProcess(SDnode *pDnode) {
  dInfo("dnode start to run in parent process");
  SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
  if (dndOpenNode(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
150

S
shm  
Shengliang Guan 已提交
151
  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
152
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
153
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
154 155
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
156 157 158 159
    int64_t shmsize = 1024 * 1024 * 2;  // size will be a configuration item
    if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
      terrno = TAOS_SYSTEM_ERROR(terrno);
      dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr());
S
shm  
Shengliang Guan 已提交
160 161 162
      return -1;
    }

S
shm  
Shengliang Guan 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175
    SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                    .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                    .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
                    .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                    .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                    .shm = pWrapper->shm,
                    .pParent = pWrapper,
                    .name = pWrapper->name};

    pWrapper->pProc = taosProcInit(&cfg);
    if (pWrapper->pProc == NULL) {
      dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
      return -1;
S
shm  
Shengliang Guan 已提交
176
    }
S
shm  
Shengliang Guan 已提交
177 178 179 180 181 182 183 184 185 186
  }

  if (dndWriteRuntimeFile(pDnode) != 0) {
    dError("failed to write runtime file since %s", terrstr());
    return -1;
  }

  for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
187

S
shm  
Shengliang Guan 已提交
188 189 190 191 192 193
    dInfo("node:%s, will not start in parent process", pWrapper->name);
    // exec new node

    pWrapper->procType = PROC_PARENT;
    if (taosProcRun(pWrapper->pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
194 195
      return -1;
    }
S
shm  
Shengliang Guan 已提交
196
  }
S
shm  
Shengliang Guan 已提交
197

S
shm  
Shengliang Guan 已提交
198
  dndSetStatus(pDnode, DND_STAT_RUNNING);
S
shm  
Shengliang Guan 已提交
199

S
shm  
Shengliang Guan 已提交
200 201 202 203
  if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
    dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
204

S
shm  
Shengliang Guan 已提交
205 206 207
  dInfo("dnode running in parent process");
  return 0;
}
S
shm  
Shengliang Guan 已提交
208

S
shm  
Shengliang Guan 已提交
209 210
static int32_t dndRunInChildProcess(SDnode *pDnode) {
  dInfo("dnode start to run in child process");
S
shm  
Shengliang Guan 已提交
211

S
shm  
Shengliang Guan 已提交
212 213 214 215
  SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
  if (dndOpenNode(pWrapper) != 0) {
    dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
216 217
  }

S
shm  
Shengliang Guan 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231
  SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                  .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                  .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                  .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                  .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                  .shm = pWrapper->shm,
                  .pParent = pWrapper,
                  .name = pWrapper->name};

  pWrapper->pProc = taosProcInit(&cfg);
  if (pWrapper->pProc == NULL) {
    dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
    return -1;
  }
S
shm  
Shengliang Guan 已提交
232

S
shm  
Shengliang Guan 已提交
233 234 235 236
  pWrapper->procType = PROC_CHILD;
  if (taosProcRun(pWrapper->pProc) != 0) {
    dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
237 238
  }

S
shm  
Shengliang Guan 已提交
239
  dInfo("dnode running in child process");
S
shm  
Shengliang Guan 已提交
240 241 242
  return 0;
}

S
shm  
Shengliang Guan 已提交
243
int32_t dndRun(SDnode * pDnode) {
S
Shengliang Guan 已提交
244
  if (!tsMultiProcess) {
S
shm  
Shengliang Guan 已提交
245
    if (dndRunInSingleProcess(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
246 247 248 249 250 251
      dError("failed to run dnode since %s", terrstr());
      return -1;
    }
  } else if (pDnode->ntype == DNODE) {
    if (dndRunInParentProcess(pDnode) != 0) {
      dError("failed to run dnode in parent process since %s", terrstr());
S
shm  
Shengliang Guan 已提交
252 253 254
      return -1;
    }
  } else {
S
shm  
Shengliang Guan 已提交
255 256
    if (dndRunInChildProcess(pDnode) != 0) {
      dError("failed to run dnode in child process since %s", terrstr());
S
shm  
Shengliang Guan 已提交
257 258 259 260
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
261
  dndReportStartup(pDnode, "TDengine", "initialized successfully");
S
shm  
Shengliang Guan 已提交
262
  dInfo("TDengine initialized successfully");
S
shm  
Shengliang Guan 已提交
263

S
shm  
Shengliang Guan 已提交
264
  while (1) {
S
shm  
Shengliang Guan 已提交
265
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
266
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
267 268
      break;
    }
S
shm  
Shengliang Guan 已提交
269 270
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
271 272

  return 0;
S
shm  
Shengliang Guan 已提交
273
}