dndExec.c 9.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndInt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21 22
static void dndResetLog(SMgmtWrapper *pMgmt) {
  char logname[24] = {0};
  snprintf(logname, sizeof(logname), "%slog", pMgmt->name);

S
shm  
Shengliang Guan 已提交
23
  dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname);
S
Shengliang Guan 已提交
24 25 26 27
  taosCloseLog();
  taosInitLog(logname, 1);
}

S
shm  
Shengliang Guan 已提交
28
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
29 30
  bool required = false;
  int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required);
S
Shengliang Guan 已提交
31
  if (!required) {
S
shm  
Shengliang Guan 已提交
32
    dDebug("node:%s, no need to start", pWrapper->name);
S
Shengliang Guan 已提交
33
  } else {
S
shm  
Shengliang Guan 已提交
34
    dDebug("node:%s, need to start", pWrapper->name);
S
Shengliang Guan 已提交
35
  }
S
Shengliang Guan 已提交
36 37
  return required;
}
S
Shengliang Guan 已提交
38

S
shm  
Shengliang Guan 已提交
39
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
S
shm  
Shengliang Guan 已提交
40 41 42
  int32_t code = (*pWrapper->fp.openFp)(pWrapper);
  if (code != 0) {
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
S
shm  
Shengliang Guan 已提交
43
    return -1;
S
shm  
Shengliang Guan 已提交
44 45 46 47
  } else {
    dDebug("node:%s, has been opened", pWrapper->name);
  }

S
shm  
Shengliang Guan 已提交
48 49
  pWrapper->deployed = true;
  return 0;
S
shm  
Shengliang Guan 已提交
50
}
S
shm  
Shengliang Guan 已提交
51

S
shm  
Shengliang Guan 已提交
52
void dndCloseNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
53
  dDebug("node:%s, start to close", pWrapper->name);
S
shm  
Shengliang Guan 已提交
54
  pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
55
  taosWLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
56 57 58 59
  if (pWrapper->deployed) {
    (*pWrapper->fp.closeFp)(pWrapper);
    pWrapper->deployed = false;
  }
S
Shengliang Guan 已提交
60 61 62 63 64 65
  taosWUnLockLatch(&pWrapper->latch);

  while (pWrapper->refCount > 0) {
    taosMsleep(10);
  }

S
shm  
Shengliang Guan 已提交
66 67 68 69
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
Shengliang Guan 已提交
70
  dDebug("node:%s, has been closed", pWrapper->name);
S
shm  
Shengliang Guan 已提交
71
}
S
shm  
Shengliang Guan 已提交
72

S
shm  
Shengliang Guan 已提交
73 74
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
  dInfo("dnode run in single process mode");
S
shm  
Shengliang Guan 已提交
75

S
shm  
Shengliang Guan 已提交
76 77
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
78
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
79
    if (!pWrapper->required) continue;
S
shm  
Shengliang Guan 已提交
80 81
    SMsgCb msgCb = dndCreateMsgcb(pWrapper);
    tmsgSetDefaultMsgCb(&msgCb);
S
shm  
Shengliang Guan 已提交
82

S
shm  
Shengliang Guan 已提交
83 84 85 86 87 88
    if (taosMkDir(pWrapper->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
      return -1;
    }

S
shm  
Shengliang Guan 已提交
89
    dInfo("node:%s, will start in single process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
90
    pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
91
    if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
92 93 94 95
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }
S
shm  
Shengliang Guan 已提交
96

S
shm  
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106
  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
107 108
  }

S
shm  
Shengliang Guan 已提交
109
  return 0;
S
shm  
Shengliang Guan 已提交
110 111
}

S
shm  
Shengliang Guan 已提交
112
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
S
shm  
Shengliang Guan 已提交
113
  // dndCleanupServer(pDnode);
S
shm  
Shengliang Guan 已提交
114 115 116
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    if (except == n) continue;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
117
    pWrapper->required = false;
S
shm  
Shengliang Guan 已提交
118 119 120
  }
}

S
Shengliang Guan 已提交
121 122
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                 ProcFuncType ftype) {
S
shm  
Shengliang Guan 已提交
123 124
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;
S
shm  
Shengliang Guan 已提交
125
  dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
S
shm  
Shengliang Guan 已提交
126 127

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
S
Shengliang Guan 已提交
128
  int32_t   code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
129 130

  if (code != 0) {
S
Shengliang Guan 已提交
131
    dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
132
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

S
Shengliang Guan 已提交
143 144 145
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
                                  ProcFuncType ftype) {
  pMsg->pCont = pCont;
S
shm  
Shengliang Guan 已提交
146
  dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
S
Shengliang Guan 已提交
147 148

  switch (ftype) {
S
shm  
Shengliang Guan 已提交
149
    case PROC_REG:
S
Shengliang Guan 已提交
150 151
      rpcRegisterBrokenLinkArg(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
152 153 154 155
    case PROC_RELEASE:
      rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
      rpcFreeCont(pCont);
      break;
S
shm  
Shengliang Guan 已提交
156 157 158
    case PROC_REQ:
      // todo send to dnode
      dndSendReqToMnode(pWrapper, pMsg);
S
Shengliang Guan 已提交
159 160 161 162 163
    default:
      dndSendRpcRsp(pWrapper, pMsg);
      break;
  }
  taosMemoryFree(pMsg);
S
shm  
Shengliang Guan 已提交
164 165
}

S
shm  
Shengliang Guan 已提交
166
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
167 168
  dInfo("dnode run in multi process mode");

S
shm  
Shengliang Guan 已提交
169 170
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
171
    pWrapper->required = dndRequireNode(pWrapper);
S
shm  
Shengliang Guan 已提交
172 173
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
174 175 176
    SMsgCb msgCb = dndCreateMsgcb(pWrapper);
    tmsgSetDefaultMsgCb(&msgCb);

S
shm  
Shengliang Guan 已提交
177 178 179 180 181 182
    if (taosMkDir(pWrapper->path) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
      return -1;
    }

S
shm  
Shengliang Guan 已提交
183
    if (n == DNODE) {
S
shm  
Shengliang Guan 已提交
184
      dInfo("node:%s, will start in parent process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
185
      pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
186
      if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
187 188 189 190 191 192
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
      continue;
    }

S
shm  
Shengliang Guan 已提交
193 194 195 196 197 198 199 200
    SProcCfg  cfg = {.childQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                     .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                     .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                     .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .parentQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
wafwerar's avatar
wafwerar 已提交
201 202
                     .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
                     .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
S
shm  
Shengliang Guan 已提交
203 204 205 206
                     .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .pParent = pWrapper,
                     .name = pWrapper->name};
S
shm  
Shengliang Guan 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220
    SProcObj *pProc = taosProcInit(&cfg);
    if (pProc == NULL) {
      dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
      return -1;
    }

    pWrapper->pProc = pProc;

    if (taosProcIsChild(pProc)) {
      dInfo("node:%s, will start in child process", pWrapper->name);
      pWrapper->procType = PROC_CHILD;
      dndResetLog(pWrapper);

      dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
S
shm  
Shengliang Guan 已提交
221
      dndClearNodesExecpt(pDnode, n);
S
shm  
Shengliang Guan 已提交
222 223

      dInfo("node:%s, will be initialized in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
224 225 226 227 228 229 230 231 232 233
      if (dndOpenNode(pWrapper) != 0) {
        dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr());
        return -1;
      }

      if (taosProcRun(pProc) != 0) {
        dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
        return -1;
      }
      break;
S
shm  
Shengliang Guan 已提交
234
    } else {
S
shm  
Shengliang Guan 已提交
235
      dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
S
shm  
Shengliang Guan 已提交
236
      pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
237 238 239 240
      if (taosProcRun(pProc) != 0) {
        dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
        return -1;
      }
S
shm  
Shengliang Guan 已提交
241
    }
S
shm  
Shengliang Guan 已提交
242 243
  }

S
shm  
Shengliang Guan 已提交
244 245 246 247 248 249 250 251 252 253 254 255
  dndSetStatus(pDnode, DND_STAT_RUNNING);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    if (!pWrapper->required) continue;
    if (pWrapper->fp.startFp == NULL) continue;
    if (pWrapper->procType == PROC_PARENT && n != DNODE) continue;
    if (pWrapper->procType == PROC_CHILD && n == DNODE) continue;
    if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
256 257
  }

S
shm  
Shengliang Guan 已提交
258 259 260 261
  return 0;
}

int32_t dndRun(SDnode *pDnode) {
S
Shengliang Guan 已提交
262
  if (!tsMultiProcess) {
S
shm  
Shengliang Guan 已提交
263 264 265 266 267 268 269 270 271 272 273
    if (dndRunInSingleProcess(pDnode) != 0) {
      dError("failed to run dnode in single process mode since %s", terrstr());
      return -1;
    }
  } else {
    if (dndRunInMultiProcess(pDnode) != 0) {
      dError("failed to run dnode in multi process mode since %s", terrstr());
      return -1;
    }
  }

S
shm  
Shengliang Guan 已提交
274 275
  dndReportStartup(pDnode, "TDengine", "initialized successfully");

S
shm  
Shengliang Guan 已提交
276
  while (1) {
S
shm  
Shengliang Guan 已提交
277
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
278
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
279 280
      break;
    }
S
shm  
Shengliang Guan 已提交
281 282
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
283 284

  return 0;
S
shm  
Shengliang Guan 已提交
285
}