dmMgmt.c 12.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
shm  
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static bool dmIsNodeRequired(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }

static bool dmRequireNode(SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
22
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
S
Shengliang Guan 已提交
23 24

  bool    required = false;
S
Shengliang Guan 已提交
25
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33 34
  if (!required) {
    dDebug("node:%s, does not require startup", pWrapper->name);
  }

  if (pWrapper->ntype == DNODE && pWrapper->pDnode->rtype != DNODE && pWrapper->pDnode->rtype != NODE_END) {
    required = false;
    dDebug("node:%s, does not require startup in child process", pWrapper->name);
  }

S
Shengliang Guan 已提交
35 36 37 38
  if (required) {
    dDebug("node:%s, required to startup", pWrapper->name);
  }

S
Shengliang Guan 已提交
39 40
  return required;
}
S
Shengliang Guan 已提交
41

S
Shengliang Guan 已提交
42
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
  pDnode->rtype = pOption->ntype;

  if (tsMultiProcess == 0) {
    pDnode->ptype = DND_PROC_SINGLE;
    dInfo("dnode will run in single-process mode");
  } else if (tsMultiProcess > 1) {
    pDnode->ptype = DND_PROC_TEST;
    dInfo("dnode will run in multi-process test mode");
  } else if (pDnode->rtype == DNODE || pDnode->rtype == NODE_END) {
    pDnode->ptype = DND_PROC_PARENT;
    dInfo("dnode will run in parent-process mode");
  } else {
    pDnode->ptype = DND_PROC_CHILD;
    SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->rtype];
S
Shengliang Guan 已提交
57
    dInfo("dnode will run in child-process mode, node:%s", dmNodeName(pDnode->rtype));
S
Shengliang Guan 已提交
58 59
  }

S
Shengliang Guan 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
  SDnodeData *pData = &pDnode->data;
  pData->dnodeId = 0;
  pData->clusterId = 0;
  pData->dnodeVer = 0;
  pData->updateTime = 0;
  pData->rebootTime = taosGetTimestampMs();
  pData->dropped = 0;
  pData->stopped = 0;
  pData->localEp = strdup(pOption->localEp);
  pData->localFqdn = strdup(pOption->localFqdn);
  pData->firstEp = strdup(pOption->firstEp);
  pData->secondEp = strdup(pOption->secondEp);
  pData->supportVnodes = pOption->numOfSupportVnodes;
  pData->serverPort = pOption->serverPort;
  pData->numOfDisks = pOption->numOfDisks;
  pData->disks = pOption->disks;
  pData->dataDir = strdup(pOption->dataDir);

S
Shengliang Guan 已提交
78 79
  if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL || pData->firstEp == NULL ||
      pData->secondEp == NULL) {
S
shm  
Shengliang Guan 已提交
80 81 82
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
shm  
Shengliang Guan 已提交
83

S
Shengliang Guan 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pData->dnodeHash == NULL) {
    dError("failed to init dnode hash");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (dmReadEps(pData) != 0) {
    dError("failed to read file since %s", terrstr());
    return -1;
  }

  if (pData->dropped) {
    dError("dnode will not start since its already dropped");
    return -1;
  }

  taosInitRWLatch(&pData->latch);
102
  taosThreadMutexInit(&pDnode->mutex, NULL);
S
shm  
Shengliang Guan 已提交
103 104 105
  return 0;
}

S
Shengliang Guan 已提交
106
static void dmClearVars(SDnode *pDnode) {
S
Shengliang Guan 已提交
107 108 109 110 111 112 113 114 115 116
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
    taosMemoryFreeClear(pWrapper->path);
  }
  if (pDnode->lockfile != NULL) {
    taosUnLockFile(pDnode->lockfile);
    taosCloseFile(&pDnode->lockfile);
    pDnode->lockfile = NULL;
  }

S
Shengliang Guan 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  SDnodeData *pData = &pDnode->data;
  taosWLockLatch(&pData->latch);
  if (pData->dnodeEps != NULL) {
    taosArrayDestroy(pData->dnodeEps);
    pData->dnodeEps = NULL;
  }
  if (pData->dnodeHash != NULL) {
    taosHashCleanup(pData->dnodeHash);
    pData->dnodeHash = NULL;
  }
  taosWUnLockLatch(&pData->latch);

  taosMemoryFreeClear(pData->localEp);
  taosMemoryFreeClear(pData->localFqdn);
  taosMemoryFreeClear(pData->firstEp);
  taosMemoryFreeClear(pData->secondEp);
  taosMemoryFreeClear(pData->dataDir);
S
Shengliang Guan 已提交
134 135 136 137

  taosThreadMutexDestroy(&pDnode->mutex);
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
  taosMemoryFree(pDnode);
S
shm  
Shengliang Guan 已提交
138 139
}

S
Shengliang Guan 已提交
140
SDnode *dmCreate(const SDnodeOpt *pOption) {
S
Shengliang Guan 已提交
141
  dInfo("start to create dnode");
S
shm  
Shengliang Guan 已提交
142
  int32_t code = -1;
S
Shengliang Guan 已提交
143
  char    path[PATH_MAX + 100] = {0};
S
shm  
Shengliang Guan 已提交
144 145
  SDnode *pDnode = NULL;

wafwerar's avatar
wafwerar 已提交
146
  pDnode = taosMemoryCalloc(1, sizeof(SDnode));
S
shm  
Shengliang Guan 已提交
147 148 149 150 151
  if (pDnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

S
Shengliang Guan 已提交
152 153 154 155
  if (dmInitVars(pDnode, pOption) != 0) {
    goto _OVER;
  }

S
Shengliang Guan 已提交
156 157 158 159 160 161
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
  pDnode->wrappers[BNODE].func = bmGetMgmtFunc();
S
shm  
Shengliang Guan 已提交
162

S
Shengliang 已提交
163 164
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
shm  
Shengliang Guan 已提交
165
    pWrapper->pDnode = pDnode;
S
Shengliang Guan 已提交
166
    pWrapper->name = dmNodeName(ntype);
S
Shengliang Guan 已提交
167 168 169 170 171 172 173 174
    pWrapper->ntype = ntype;
    pWrapper->proc.wrapper = pWrapper;
    pWrapper->proc.shm.id = -1;
    pWrapper->proc.pid = -1;
    pWrapper->proc.ptype = pDnode->ptype;
    if (ntype == DNODE) {
      pWrapper->proc.ptype = DND_PROC_SINGLE;
    }
S
Shengliang Guan 已提交
175 176
    taosInitRWLatch(&pWrapper->latch);

S
Shengliang Guan 已提交
177 178
    snprintf(path, sizeof(path), "%s%s%s", pOption->dataDir, TD_DIRSEP, pWrapper->name);
    pWrapper->path = strdup(path);
S
shm  
Shengliang Guan 已提交
179 180 181 182 183
    if (pWrapper->path == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

S
Shengliang Guan 已提交
184 185
    pWrapper->required = dmRequireNode(pWrapper);

S
Shengliang Guan 已提交
186
    if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) {
S
Shengliang Guan 已提交
187 188 189
      dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr());
      goto _OVER;
    }
S
shm  
Shengliang Guan 已提交
190 191
  }

S
Shengliang Guan 已提交
192
  if (dmInitMsgHandle(pDnode) != 0) {
S
Shengliang Guan 已提交
193
    dError("failed to init msg handles since %s", terrstr());
S
shm  
Shengliang Guan 已提交
194 195 196
    goto _OVER;
  }

S
Shengliang Guan 已提交
197
  if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) {
S
Shengliang Guan 已提交
198 199 200 201 202 203 204 205 206 207 208
    pDnode->lockfile = dmCheckRunning(pOption->dataDir);
    if (pDnode->lockfile == NULL) {
      goto _OVER;
    }

    if (dmInitServer(pDnode) != 0) {
      dError("failed to init transport since %s", terrstr());
      goto _OVER;
    }
  }

S
Shengliang Guan 已提交
209 210 211 212
  if (dmInitClient(pDnode) != 0) {
    goto _OVER;
  }

S
Shengliang Guan 已提交
213
  dmReportStartup(pDnode, "dnode-transport", "initialized");
S
Shengliang Guan 已提交
214
  dInfo("dnode is created, ptr:%p", pDnode);
S
shm  
Shengliang Guan 已提交
215 216 217
  code = 0;

_OVER:
S
Shengliang Guan 已提交
218
  if (code != 0 && pDnode != NULL) {
S
Shengliang Guan 已提交
219
    dmClearVars(pDnode);
220
    pDnode = NULL;
S
shm  
Shengliang Guan 已提交
221
    dError("failed to create dnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
222 223 224 225 226
  }

  return pDnode;
}

S
Shengliang Guan 已提交
227
void dmClose(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
228
  if (pDnode == NULL) return;
S
Shengliang Guan 已提交
229 230 231

  dmCleanupClient(pDnode);
  dmCleanupServer(pDnode);
S
Shengliang Guan 已提交
232
  dmClearVars(pDnode);
S
Shengliang Guan 已提交
233
  dInfo("dnode is closed, ptr:%p", pDnode);
S
shm  
Shengliang Guan 已提交
234
}
S
Shengliang Guan 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255

void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
  if (pDnode->status != status) {
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
    pDnode->status = status;
  }
}

void dmSetEvent(SDnode *pDnode, EDndEvent event) {
  if (event == DND_EVENT_STOP) {
    pDnode->event = event;
  }
}

SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
  SMgmtWrapper *pRetWrapper = pWrapper;

  taosRLockLatch(&pWrapper->latch);
  if (pWrapper->deployed) {
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
S
Shengliang Guan 已提交
256
    dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269
  } else {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
    pRetWrapper = NULL;
  }
  taosRUnLockLatch(&pWrapper->latch);

  return pRetWrapper;
}

int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
  int32_t code = 0;

  taosRLockLatch(&pWrapper->latch);
270
  if (pWrapper->deployed || (InParentProc(pWrapper->proc.ptype) && pWrapper->required)) {
S
Shengliang Guan 已提交
271
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
S
Shengliang Guan 已提交
272
    dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  } else {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
    code = -1;
  }
  taosRUnLockLatch(&pWrapper->latch);

  return code;
}

void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
  if (pWrapper == NULL) return;

  taosRLockLatch(&pWrapper->latch);
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
  taosRUnLockLatch(&pWrapper->latch);
S
Shengliang Guan 已提交
288
  dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
289 290 291 292 293 294
}

void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
  SStartupInfo *pStartup = &pDnode->startup;
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
S
Shengliang Guan 已提交
295
  dDebug("step:%s, %s", pStartup->name, pStartup->desc);
S
Shengliang Guan 已提交
296 297 298 299 300 301
}

void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
  dmReportStartup(pWrapper->pDnode, pName, pDesc);
}

302
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
S
Shengliang Guan 已提交
303
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
S
Shengliang Guan 已提交
304 305 306 307 308
  pStatus->details[0] = 0;

  if (pDnode->status == DND_STAT_INIT) {
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
309
  } else if (pDnode->status == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
310
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
311 312
  } else {
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
S
Shengliang Guan 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
  }
}

void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
  dDebug("net test req is received");
  SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0};
  rsp.pCont = rpcMallocCont(pReq->contLen);
  if (rsp.pCont == NULL) {
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    rsp.contLen = pReq->contLen;
  }
  rpcSendResponse(&rsp);
  rpcFreeCont(pReq->pCont);
}

329 330
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) {
  dDebug("server startup status req is received");
S
Shengliang Guan 已提交
331 332

  SServerStatusRsp statusRsp = {0};
333
  dmGetServerStartupStatus(pDnode, &statusRsp);
S
Shengliang Guan 已提交
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356

  SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
  int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
  if (rspLen < 0) {
    rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
  rspMsg.pCont = pRsp;
  rspMsg.contLen = rspLen;

_OVER:
  rpcSendResponse(&rspMsg);
  rpcFreeCont(pReq->pCont);
}

S
Shengliang Guan 已提交
357
static int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dmReleaseWrapper(pWrapper);
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
    dError("failed to create node since %s", terrstr());
    return -1;
  }

  taosThreadMutexLock(&pDnode->mutex);
  pWrapper = &pDnode->wrappers[ntype];

  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
375
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
S
Shengliang Guan 已提交
376

S
Shengliang Guan 已提交
377
  int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
S
Shengliang Guan 已提交
378 379 380 381 382 383 384
  if (code != 0) {
    dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
  } else {
    dDebug("node:%s, has been created", pWrapper->name);
    (void)dmOpenNode(pWrapper);
    pWrapper->required = true;
    pWrapper->deployed = true;
S
Shengliang Guan 已提交
385
    pWrapper->proc.ptype = pDnode->ptype;
S
Shengliang Guan 已提交
386 387 388 389 390 391
  }

  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
}

S
Shengliang Guan 已提交
392
static int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
393 394 395 396 397 398 399 400 401
  SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
    dError("failed to drop node since %s", terrstr());
    return -1;
  }

  taosThreadMutexLock(&pDnode->mutex);

S
Shengliang Guan 已提交
402
  int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg);
S
Shengliang Guan 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
  if (code != 0) {
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
    dDebug("node:%s, has been dropped", pWrapper->name);
    pWrapper->required = false;
    pWrapper->deployed = false;
  }

  dmReleaseWrapper(pWrapper);

  if (code == 0) {
    dmCloseNode(pWrapper);
    taosRemoveDir(pWrapper->path);
  }
  taosThreadMutexUnlock(&pDnode->mutex);
  return code;
S
Shengliang Guan 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
}

SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
  SMgmtInputOpt opt = {
      .pDnode = pWrapper->pDnode,
      .pData = &pWrapper->pDnode->data,
      .processCreateNodeFp = dmProcessCreateNodeReq,
      .processDropNodeFp = dmProcessDropNodeReq,
      .isNodeRequiredFp = dmIsNodeRequired,
      .name = pWrapper->name,
      .path = pWrapper->path,
  };

  opt.msgCb = dmGetMsgcb(pWrapper);
  return opt;
S
Shengliang Guan 已提交
434
}