dndNode.c 11.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dndNode.h"
S
Shengliang Guan 已提交
18
#include "dndTransport.h"
S
Shengliang Guan 已提交
19

S
shm  
Shengliang Guan 已提交
20
#include "bmInt.h"
S
shm  
Shengliang Guan 已提交
21
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
22 23 24 25
#include "mmInt.h"
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33 34 35

static void dndResetLog(SMgmtWrapper *pMgmt) {
  char logname[24] = {0};
  snprintf(logname, sizeof(logname), "%slog", pMgmt->name);

  dInfo("node:%s, reset log to %s", pMgmt->name, logname);
  taosCloseLog();
  taosInitLog(logname, 1);
}

S
shm  
Shengliang Guan 已提交
36
static bool dndRequireNode(SMgmtWrapper *pMgmt) {
S
Shengliang Guan 已提交
37 38
  bool required = (*pMgmt->fp.requiredFp)(pMgmt);
  if (!required) {
S
shm  
Shengliang Guan 已提交
39
    dDebug("node:%s, no need to start", pMgmt->name);
S
Shengliang Guan 已提交
40
  } else {
S
shm  
Shengliang Guan 已提交
41
    dDebug("node:%s, need to start", pMgmt->name);
S
Shengliang Guan 已提交
42
  }
S
Shengliang Guan 已提交
43 44
  return required;
}
S
Shengliang Guan 已提交
45

S
shm  
Shengliang Guan 已提交
46 47
static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }

S
shm  
Shengliang Guan 已提交
48 49 50 51
static void dndCloseNode(SMgmtWrapper *pWrapper) {
  if (pWrapper->required) {
    (*pWrapper->fp.closeFp)(pWrapper);
  }
S
shm  
Shengliang Guan 已提交
52 53 54 55
  if (pWrapper->pProc) {
    taosProcCleanup(pWrapper->pProc);
    pWrapper->pProc = NULL;
  }
S
shm  
Shengliang Guan 已提交
56
}
S
shm  
Shengliang Guan 已提交
57

S
shm  
Shengliang Guan 已提交
58
static void dndClearMemory(SDnode *pDnode) {
S
Shengliang Guan 已提交
59
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
60
    SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
61
    tfree(pMgmt->path);
S
Shengliang Guan 已提交
62
  }
S
Shengliang Guan 已提交
63 64 65
  if (pDnode->pLockFile != NULL) {
    taosUnLockFile(pDnode->pLockFile);
    taosCloseFile(&pDnode->pLockFile);
S
shm  
Shengliang Guan 已提交
66
    pDnode->pLockFile = NULL;
S
Shengliang Guan 已提交
67
  }
S
shm  
Shengliang Guan 已提交
68
  tfree(pDnode);
S
Shengliang Guan 已提交
69 70
  dDebug("dnode object memory is cleared, data:%p", pDnode);
}
S
Shengliang Guan 已提交
71

S
Shengliang Guan 已提交
72
SDnode *dndCreate(SDndCfg *pCfg) {
S
Shengliang Guan 已提交
73
  dInfo("start to create dnode object");
S
Shengliang Guan 已提交
74 75 76
  int32_t code = -1;
  char    path[PATH_MAX + 100];
  SDnode *pDnode = NULL;
S
Shengliang Guan 已提交
77

S
Shengliang Guan 已提交
78
  pDnode = calloc(1, sizeof(SDnode));
S
Shengliang Guan 已提交
79 80
  if (pDnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
81
    goto _OVER;
S
Shengliang Guan 已提交
82 83
  }

S
shm  
Shengliang Guan 已提交
84
  memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
S
shm  
Shengliang Guan 已提交
85
  dndSetStatus(pDnode, DND_STAT_INIT);
S
shm  
Shengliang Guan 已提交
86
  pDnode->rebootTime = taosGetTimestampMs();
S
shm  
Shengliang Guan 已提交
87 88 89 90 91 92 93 94 95
  pDnode->pLockFile = dndCheckRunning(pCfg->dataDir);
  if (pDnode->pLockFile == NULL) {
    goto _OVER;
  }

  if (dndInitServer(pDnode) != 0) {
    dError("failed to init trans server since %s", terrstr());
    goto _OVER;
  }
S
shm  
Shengliang Guan 已提交
96

S
shm  
Shengliang Guan 已提交
97 98
  if (dndInitClient(pDnode) != 0) {
    dError("failed to init trans client since %s", terrstr());
S
shm  
Shengliang Guan 已提交
99 100 101
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
102 103 104 105 106 107
  dmGetMgmtFp(&pDnode->wrappers[DNODE]);
  mmGetMgmtFp(&pDnode->wrappers[MNODE]);
  vmGetMgmtFp(&pDnode->wrappers[VNODES]);
  qmGetMgmtFp(&pDnode->wrappers[QNODE]);
  smGetMgmtFp(&pDnode->wrappers[SNODE]);
  bmGetMgmtFp(&pDnode->wrappers[BNODE]);
S
Shengliang Guan 已提交
108

S
shm  
Shengliang Guan 已提交
109
  if (dndInitMsgHandle(pDnode) != 0) {
S
shm  
Shengliang Guan 已提交
110 111 112
    goto _OVER;
  }

S
Shengliang Guan 已提交
113
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
S
shm  
Shengliang Guan 已提交
114 115 116
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name);
    pWrapper->path = strdup(path);
S
shm  
Shengliang Guan 已提交
117
    pWrapper->pDnode = pDnode;
S
shm  
Shengliang Guan 已提交
118
    if (pDnode->wrappers[n].path == NULL) {
S
Shengliang Guan 已提交
119 120 121 122
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

S
shm  
Shengliang Guan 已提交
123 124 125 126
    pWrapper->procType = PROC_SINGLE;
    pWrapper->required = dndRequireNode(pWrapper);
    if (pWrapper->required) {
      if (taosMkDir(pWrapper->path) != 0) {
S
Shengliang Guan 已提交
127
        terrno = TAOS_SYSTEM_ERROR(errno);
S
shm  
Shengliang Guan 已提交
128
        dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
S
Shengliang Guan 已提交
129 130 131
        goto _OVER;
      }
    }
S
Shengliang Guan 已提交
132 133
  }

S
shm  
Shengliang Guan 已提交
134 135
  code = 0;

S
Shengliang Guan 已提交
136 137
_OVER:
  if (code != 0 && pDnode) {
S
shm  
Shengliang Guan 已提交
138
    dndClearMemory(pDnode);
S
Shengliang Guan 已提交
139 140 141
    dError("failed to create dnode object since %s", terrstr());
  } else {
    dInfo("dnode object is created, data:%p", pDnode);
S
Shengliang Guan 已提交
142 143
  }

S
Shengliang Guan 已提交
144 145 146
  return pDnode;
}

S
Shengliang Guan 已提交
147
void dndClose(SDnode *pDnode) {
S
Shengliang Guan 已提交
148 149
  if (pDnode == NULL) return;

S
shm  
Shengliang Guan 已提交
150
  if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
151
    dError("dnode is shutting down, data:%p", pDnode);
S
Shengliang Guan 已提交
152 153 154
    return;
  }

S
Shengliang Guan 已提交
155
  dInfo("start to close dnode, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
156
  dndSetStatus(pDnode, DND_STAT_STOPPED);
157

S
shm  
Shengliang Guan 已提交
158 159 160 161 162 163 164 165
  dndCleanupServer(pDnode);
  dndCleanupClient(pDnode);

  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }

S
shm  
Shengliang Guan 已提交
166
  dndClearMemory(pDnode);
S
Shengliang Guan 已提交
167
  dInfo("dnode object is closed, data:%p", pDnode);
S
Shengliang Guan 已提交
168
}
S
Shengliang Guan 已提交
169

S
shm  
Shengliang Guan 已提交
170 171
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
  dInfo("dnode run in single process mode");
S
shm  
Shengliang Guan 已提交
172

S
shm  
Shengliang Guan 已提交
173 174
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
175 176
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
177
    dInfo("node:%s, will start in single process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
178
    pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
179
    if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
180 181 182 183
      dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
      return -1;
    }
  }
S
shm  
Shengliang Guan 已提交
184

S
shm  
Shengliang Guan 已提交
185 186 187
  return 0;
}

S
shm  
Shengliang Guan 已提交
188 189 190 191 192 193 194 195 196
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
  dndCleanupServer(pDnode);
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    if (except == n) continue;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
  }
}

S
shm  
Shengliang Guan 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
    dmSendRedirectRsp(pWrapper->pDnode, pRsp);
  } else {
    rpcSendResponse(pRsp);
  }
}

void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
  int32_t code = -1;

  if (pWrapper->procType != PROC_CHILD) {
    dndSendRpcRsp(pWrapper, pRsp);
  } else {
    do {
      code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
      if (code != 0) {
        taosMsleep(10);
      }
    } while (code != 0);
  }
}

static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
  dTrace("msg:%p, get from child queue", pMsg);
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  pRpc->pCont = pCont;

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  int32_t   code = (*msgFp)(pWrapper, pMsg);

  if (code != 0) {
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }

    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pCont);
  }
}

static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) {
  dTrace("msg:%p, get from parent queue", pRsp);
  pRsp->pCont = pCont;
  dndSendRpcRsp(pWrapper, pRsp);
  free(pRsp);
}

S
shm  
Shengliang Guan 已提交
248
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
249 250
  dInfo("dnode run in multi process mode");

S
shm  
Shengliang Guan 已提交
251 252
  for (ENodeType n = 0; n < NODE_MAX; ++n) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
shm  
Shengliang Guan 已提交
253 254
    if (!pWrapper->required) continue;

S
shm  
Shengliang Guan 已提交
255 256
    if (n == DNODE) {
      dInfo("node:%s, will start in parent process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
257
      pWrapper->procType = PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
258
      if (dndOpenNode(pWrapper) != 0) {
S
shm  
Shengliang Guan 已提交
259 260 261 262 263 264
        dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
        return -1;
      }
      continue;
    }

S
shm  
Shengliang Guan 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
    SProcCfg  cfg = {.childQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
                     .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
                     .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
                     .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .parentQueueSize = 1024 * 1024 * 2,  // size will be a configuration item
                     .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
                     .parentdMallocHeadFp = (ProcMallocFp)malloc,
                     .parentFreeHeadFp = (ProcFreeFp)free,
                     .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
                     .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
                     .testFlag = 0,
                     .pParent = pWrapper,
                     .name = pWrapper->name};
S
shm  
Shengliang Guan 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293
    SProcObj *pProc = taosProcInit(&cfg);
    if (pProc == NULL) {
      dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
      return -1;
    }

    pWrapper->pProc = pProc;

    if (taosProcIsChild(pProc)) {
      dInfo("node:%s, will start in child process", pWrapper->name);
      pWrapper->procType = PROC_CHILD;
      dndResetLog(pWrapper);

      dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
S
shm  
Shengliang Guan 已提交
294
      dndClearNodesExecpt(pDnode, n);
S
shm  
Shengliang Guan 已提交
295 296

      dInfo("node:%s, will be initialized in child process", pWrapper->name);
S
shm  
Shengliang Guan 已提交
297
      dndOpenNode(pWrapper);
S
shm  
Shengliang Guan 已提交
298 299 300 301
    } else {
      dInfo("node:%s, will not start in parent process", pWrapper->name);
      pWrapper->procType = PROC_PARENT;
    }
S
shm  
Shengliang Guan 已提交
302 303 304 305 306

    if (taosProcRun(pProc) != 0) {
      dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
      return -1;
    }
S
shm  
Shengliang Guan 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
  }

  return 0;
}

int32_t dndRun(SDnode *pDnode) {
  if (tsMultiProcess == 0) {
    if (dndRunInSingleProcess(pDnode) != 0) {
      dError("failed to run dnode in single process mode since %s", terrstr());
      return -1;
    }
  } else {
    if (dndRunInMultiProcess(pDnode) != 0) {
      dError("failed to run dnode in multi process mode since %s", terrstr());
      return -1;
    }
  }

  while (1) {
S
shm  
Shengliang Guan 已提交
326
    if (pDnode->event == DND_EVENT_STOP) {
S
shm  
Shengliang Guan 已提交
327
      dInfo("dnode is about to stop");
S
shm  
Shengliang Guan 已提交
328 329
      break;
    }
S
shm  
Shengliang Guan 已提交
330 331
    taosMsleep(100);
  }
S
shm  
Shengliang Guan 已提交
332 333

  return 0;
S
shm  
Shengliang Guan 已提交
334
}
S
shm  
Shengliang Guan 已提交
335

S
shm  
Shengliang Guan 已提交
336 337 338 339 340
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
  dInfo("dnode object receive event %d, data:%p", event, pDnode);
  pDnode->event = event;
}

S
shm  
Shengliang Guan 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
  pMsg->rpcMsg = *pRpc;

  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
S
shm  
Shengliang Guan 已提交
357 358
    dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }
S
shm  
Shengliang Guan 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377

  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;

  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
    goto _OVER;
  }

  pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg == NULL) {
    goto _OVER;
  }

  if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
378
  dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
379 380 381 382 383 384 385 386 387

  if (pWrapper->procType == PROC_SINGLE) {
    code = (*msgFp)(pWrapper, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
    code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
  } else {
    terrno = TSDB_CODE_MEMORY_CORRUPTED;
    dError("msg:%p, won't be processed for it is child process", pMsg);
  }
S
shm  
Shengliang Guan 已提交
388 389 390

_OVER:

S
shm  
Shengliang Guan 已提交
391 392 393 394 395 396 397
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
      dTrace("msg:%p, is freed", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
shm  
Shengliang Guan 已提交
398
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
shm  
Shengliang Guan 已提交
399 400 401
    bool isReq = (pRpc->msgType & 1U);
    if (isReq) {
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
S
shm  
Shengliang Guan 已提交
402
      dndSendRsp(pWrapper, &rsp);
S
shm  
Shengliang Guan 已提交
403 404 405 406 407
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }
S
shm  
Shengliang Guan 已提交
408
}