dndNode.c 8.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndNode.h"
S
Shengliang Guan 已提交
18
#include "dndTransport.h"
S
Shengliang Guan 已提交
19

S
shm  
Shengliang Guan 已提交
20
#include "bmInt.h"
S
shm  
Shengliang Guan 已提交
21
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
22 23 24 25
#include "mmInt.h"
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33 34 35

static void dndResetLog(SMgmtWrapper *pMgmt) {
  char logname[24] = {0};
  snprintf(logname, sizeof(logname), "%slog", pMgmt->name);

  dInfo("node:%s, reset log to %s", pMgmt->name, logname);
  taosCloseLog();
  taosInitLog(logname, 1);
}

S
shm  
Shengliang Guan 已提交
36
static bool dndRequireNode(SMgmtWrapper *pMgmt) {
S
Shengliang Guan 已提交
37 38
  bool required = (*pMgmt->fp.requiredFp)(pMgmt);
  if (!required) {
S
shm  
Shengliang Guan 已提交
39
    dDebug("node:%s, no need to start", pMgmt->name);
S
Shengliang Guan 已提交
40
  } else {
S
shm  
Shengliang Guan 已提交
41
    dDebug("node:%s, need to start", pMgmt->name);
S
Shengliang Guan 已提交
42
  }
S
Shengliang Guan 已提交
43 44
  return required;
}
S
Shengliang Guan 已提交
45

S
shm  
Shengliang Guan 已提交
46 47
static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }

S
shm  
Shengliang Guan 已提交
48 49 50 51 52
static void dndCloseNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->required) {
    (*pWrapper->fp.closeFp)(pWrapper);
  }
}
S
shm  
Shengliang Guan 已提交
53

S
shm  
Shengliang Guan 已提交
54
static void dndClearMemory(SDnode *pDnode) {
S
Shengliang Guan 已提交
55
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
56
    SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
57
    tfree(pMgmt->path);
S
Shengliang Guan 已提交
58
  }
S
Shengliang Guan 已提交
59 60 61
  if (pDnode->pLockFile != NULL) {
    taosUnLockFile(pDnode->pLockFile);
    taosCloseFile(&pDnode->pLockFile);
S
shm  
Shengliang Guan 已提交
62
    pDnode->pLockFile = NULL;
S
Shengliang Guan 已提交
63
  }
S
shm  
Shengliang Guan 已提交
64
  tfree(pDnode);
S
Shengliang Guan 已提交
65 66
  dDebug("dnode object memory is cleared, data:%p", pDnode);
}
S
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68
SDnode *dndCreate(SDndCfg *pCfg) {
S
Shengliang Guan 已提交
69
  dInfo("start to create dnode object");
S
Shengliang Guan 已提交
70 71 72
  int32_t code = -1;
  char    path[PATH_MAX + 100];
  SDnode *pDnode = NULL;
S
Shengliang Guan 已提交
73

S
Shengliang Guan 已提交
74
  pDnode = calloc(1, sizeof(SDnode));
S
Shengliang Guan 已提交
75 76
  if (pDnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
77
    goto _OVER;
S
Shengliang Guan 已提交
78 79
  }

S
shm  
Shengliang Guan 已提交
80
  memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
S
shm  
Shengliang Guan 已提交
81
  dndSetStatus(pDnode, DND_STAT_INIT);
S
shm  
Shengliang Guan 已提交
82
  pDnode->rebootTime = taosGetTimestampMs();
S
shm  
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91
  pDnode->pLockFile = dndCheckRunning(pCfg->dataDir);
  if (pDnode->pLockFile == NULL) {
    goto _OVER;
  }

  if (dndInitServer(pDnode) != 0) {
    dError("failed to init trans server since %s", terrstr());
    goto _OVER;
  }
S
shm  
Shengliang Guan 已提交
92

S
shm  
Shengliang Guan 已提交
93 94
  if (dndInitClient(pDnode) != 0) {
    dError("failed to init trans client since %s", terrstr());
S
shm  
Shengliang Guan 已提交
95 96 97
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
98 99 100 101 102 103
  dmGetMgmtFp(&pDnode->wrappers[DNODE]);
  mmGetMgmtFp(&pDnode->wrappers[MNODE]);
  vmGetMgmtFp(&pDnode->wrappers[VNODES]);
  qmGetMgmtFp(&pDnode->wrappers[QNODE]);
  smGetMgmtFp(&pDnode->wrappers[SNODE]);
  bmGetMgmtFp(&pDnode->wrappers[BNODE]);
S
Shengliang Guan 已提交
104

S
shm  
Shengliang Guan 已提交
105
  if (dndInitMsgHandle(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
106 107 108
    goto _OVER;
  }

S
Shengliang Guan 已提交
109
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
110 111 112
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name);
    pWrapper->path = strdup(path);
S
shm  
Shengliang Guan 已提交
113
    pWrapper->pDnode = pDnode;
S
shm  
Shengliang Guan 已提交
114
    if (pDnode->wrappers[n].path == NULL) {
S
Shengliang Guan 已提交
115 116 117 118
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

S
shm  
Shengliang Guan 已提交
119 120 121 122
    pWrapper->procType = PROC_SINGLE;
    pWrapper->required = dndRequireNode(pWrapper);
    if (pWrapper->required) {
      if (taosMkDir(pWrapper->path) != 0) {
S
Shengliang Guan 已提交
123
        terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
124
        dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
S
Shengliang Guan 已提交
125 126 127
        goto _OVER;
      }
    }
S
Shengliang Guan 已提交
128 129
  }

S
shm  
Shengliang Guan 已提交
130 131
  code = 0;

S
Shengliang Guan 已提交
132 133
_OVER:
  if (code != 0 && pDnode) {
S
shm  
Shengliang Guan 已提交
134
    dndClearMemory(pDnode);
S
Shengliang Guan 已提交
135 136 137
    dError("failed to create dnode object since %s", terrstr());
  } else {
    dInfo("dnode object is created, data:%p", pDnode);
S
Shengliang Guan 已提交
138 139
  }

S
Shengliang Guan 已提交
140 141 142
  return pDnode;
}

S
Shengliang Guan 已提交
143
void dndClose(SDnode *pDnode) {
S
Shengliang Guan 已提交
144 145
  if (pDnode == NULL) return;

S
shm  
Shengliang Guan 已提交
146
  if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
147
    dError("dnode is shutting down, data:%p", pDnode);
S
Shengliang Guan 已提交
148 149 150
    return;
  }

S
Shengliang Guan 已提交
151
  dInfo("start to close dnode, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
152
  dndSetStatus(pDnode, DND_STAT_STOPPED);
153

S
shm  
Shengliang Guan 已提交
154 155 156 157 158 159 160 161
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }

S
shm  
Shengliang Guan 已提交
162
  dndClearMemory(pDnode);
S
Shengliang Guan 已提交
163
  dInfo("dnode object is closed, data:%p", pDnode);
S
Shengliang Guan 已提交
164
}
S
Shengliang Guan 已提交
165

S
shm  
Shengliang Guan 已提交
166 167
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
  dInfo("dnode run in single process mode");
S
shm  
Shengliang Guan 已提交
168

S
shm  
Shengliang Guan 已提交
169 170
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
171 172
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
173
    dInfo("node:%s, will start in single process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
174
    if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
175 176 177 178
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }
S
shm  
Shengliang Guan 已提交
179

S
shm  
Shengliang Guan 已提交
180 181 182
  return 0;
}

S
shm  
Shengliang Guan 已提交
183 184 185 186 187 188 189 190 191
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
  dndCleanupServer(pDnode);
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    if (except == n) continue;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }
}

S
shm  
Shengliang Guan 已提交
192 193 194
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
195 196
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
197 198 199
    if (n == DNODE) {
      dInfo("node:%s, will start in parent process", pWrapper->name);
      pWrapper->procType = PROC_PARENT;
S
shm  
Shengliang Guan 已提交
200
      if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
      continue;
    }

    SProcCfg  cfg = {0};
    SProcObj *pProc = taosProcInit(&cfg);
    if (pProc == NULL) {
      dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
      return -1;
    }

    pWrapper->pProc = pProc;

    if (taosProcIsChild(pProc)) {
      dInfo("node:%s, will start in child process", pWrapper->name);
      pWrapper->procType = PROC_CHILD;
      dndResetLog(pWrapper);

      dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
S
shm  
Shengliang Guan 已提交
222
      dndClearNodesExecpt(pDnode, n);
S
shm  
Shengliang Guan 已提交
223 224

      dInfo("node:%s, will be initialized in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
225
      dndOpenNode(pWrapper);
S
shm  
Shengliang Guan 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
    } else {
      dInfo("node:%s, will not start in parent process", pWrapper->name);
      pWrapper->procType = PROC_PARENT;
    }
  }

  return 0;
}

int32_t dndRun(SDnode *pDnode) {
  if (tsMultiProcess == 0) {
    if (dndRunInSingleProcess(pDnode) != 0) {
      dError("failed to run dnode in single process mode since %s", terrstr());
      return -1;
    }
  } else {
    if (dndRunInMultiProcess(pDnode) != 0) {
      dError("failed to run dnode in multi process mode since %s", terrstr());
      return -1;
    }
  }

  while (1) {
S
shm  
Shengliang Guan 已提交
249
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
250
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
251 252
      break;
    }
S
shm  
Shengliang Guan 已提交
253 254
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
255 256

  return 0;
S
shm  
Shengliang Guan 已提交
257
}
S
shm  
Shengliang Guan 已提交
258

S
shm  
Shengliang Guan 已提交
259 260 261 262 263
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
  dInfo("dnode object receive event %d, data:%p", event, pDnode);
  pDnode->event = event;
}

S
shm  
Shengliang Guan 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->rpcMsg = *pRpc;

  return 0;
}

static void dndSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
  if (pRpc->code == TSDB_CODE_APP_NOT_READY) {
    dmSendRedirectRsp(pDnode, pRpc);
  } else {
    rpcSendResponse(pRpc);
  }
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
S
shm  
Shengliang Guan 已提交
288 289
    dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }
S
shm  
Shengliang Guan 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308

  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    goto _OVER;
  }

  pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg == NULL) {
    goto _OVER;
  }

  if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
309
  dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
310 311 312 313 314
  code = (*msgFp)(pWrapper, pMsg);

_OVER:

  if (code != 0) {
S
shm  
Shengliang Guan 已提交
315
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
shm  
Shengliang Guan 已提交
316 317 318 319 320 321 322 323 324
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRpcRsp(pWrapper->pDnode, &rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }
S
shm  
Shengliang Guan 已提交
325
}