dndObj.c 6.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
shm  
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dndNode.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
S
Shengliang Guan 已提交
20 21 22 23 24 25 26 27 28
  pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
  pDnode->data.serverPort = pOption->serverPort;
  pDnode->data.dataDir = strdup(pOption->dataDir);
  pDnode->data.localEp = strdup(pOption->localEp);
  pDnode->data.localFqdn = strdup(pOption->localFqdn);
  pDnode->data.firstEp = strdup(pOption->firstEp);
  pDnode->data.secondEp = strdup(pOption->secondEp);
  pDnode->data.disks = pOption->disks;
  pDnode->data.numOfDisks = pOption->numOfDisks;
S
Shengliang Guan 已提交
29
  pDnode->ntype = pOption->ntype;
S
Shengliang Guan 已提交
30
  pDnode->data.rebootTime = taosGetTimestampMs();
S
shm  
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32 33
  if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL || pDnode->data.firstEp == NULL ||
      pDnode->data.secondEp == NULL) {
S
shm  
Shengliang Guan 已提交
34 35 36
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
shm  
Shengliang Guan 已提交
37

S
Shengliang Guan 已提交
38 39 40
  if (!tsMultiProcess || pDnode->ntype == NODE_BEGIN || pDnode->ntype == NODE_END) {
    pDnode->data.lockfile = dndCheckRunning(pDnode->data.dataDir);
    if (pDnode->data.lockfile == NULL) {
S
shm  
Shengliang Guan 已提交
41 42
      return -1;
    }
S
shm  
Shengliang Guan 已提交
43 44
  }

S
shm  
Shengliang Guan 已提交
45 46 47
  return 0;
}

S
shm  
Shengliang Guan 已提交
48
static void dndClearVars(SDnode *pDnode) {
S
Shengliang Guan 已提交
49
  for (EDndNodeType n = 0; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
50
    SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
wafwerar's avatar
wafwerar 已提交
51
    taosMemoryFreeClear(pMgmt->path);
S
shm  
Shengliang Guan 已提交
52
  }
S
Shengliang Guan 已提交
53 54 55 56
  if (pDnode->data.lockfile != NULL) {
    taosUnLockFile(pDnode->data.lockfile);
    taosCloseFile(&pDnode->data.lockfile);
    pDnode->data.lockfile = NULL;
S
shm  
Shengliang Guan 已提交
57
  }
S
Shengliang Guan 已提交
58 59 60 61 62
  taosMemoryFreeClear(pDnode->data.localEp);
  taosMemoryFreeClear(pDnode->data.localFqdn);
  taosMemoryFreeClear(pDnode->data.firstEp);
  taosMemoryFreeClear(pDnode->data.secondEp);
  taosMemoryFreeClear(pDnode->data.dataDir);
wafwerar's avatar
wafwerar 已提交
63
  taosMemoryFree(pDnode);
S
shm  
Shengliang Guan 已提交
64
  dDebug("dnode memory is cleared, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
65 66 67
}

SDnode *dndCreate(const SDnodeOpt *pOption) {
S
shm  
Shengliang Guan 已提交
68
  dDebug("start to create dnode object");
S
shm  
Shengliang Guan 已提交
69
  int32_t code = -1;
S
xshm  
Shengliang Guan 已提交
70
  char    path[PATH_MAX] = {0};
S
shm  
Shengliang Guan 已提交
71 72
  SDnode *pDnode = NULL;

wafwerar's avatar
wafwerar 已提交
73
  pDnode = taosMemoryCalloc(1, sizeof(SDnode));
S
shm  
Shengliang Guan 已提交
74 75 76 77 78
  if (pDnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
79 80
  if (dndInitVars(pDnode, pOption) != 0) {
    dError("failed to init variables since %s", terrstr());
S
shm  
Shengliang Guan 已提交
81 82 83 84
    goto _OVER;
  }

  dndSetStatus(pDnode, DND_STAT_INIT);
S
Shengliang Guan 已提交
85
  dmSetMgmtFp(&pDnode->wrappers[NODE_BEGIN]);
S
Shengliang Guan 已提交
86
  mmSetMgmtFp(&pDnode->wrappers[MNODE]);
S
Shengliang Guan 已提交
87
  vmSetMgmtFp(&pDnode->wrappers[VNODE]);
S
Shengliang Guan 已提交
88 89 90
  qmSetMgmtFp(&pDnode->wrappers[QNODE]);
  smSetMgmtFp(&pDnode->wrappers[SNODE]);
  bmSetMgmtFp(&pDnode->wrappers[BNODE]);
S
shm  
Shengliang Guan 已提交
91

S
Shengliang Guan 已提交
92
  for (EDndNodeType n = 0; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
93
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
S
Shengliang Guan 已提交
94
    snprintf(path, sizeof(path), "%s%s%s", pDnode->data.dataDir, TD_DIRSEP, pWrapper->name);
S
shm  
Shengliang Guan 已提交
95
    pWrapper->path = strdup(path);
S
Shengliang Guan 已提交
96
    pWrapper->procShm.id = -1;
S
shm  
Shengliang Guan 已提交
97
    pWrapper->pDnode = pDnode;
S
Shengliang Guan 已提交
98
    pWrapper->ntype = n;
S
shm  
Shengliang Guan 已提交
99 100 101 102 103
    if (pWrapper->path == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

S
Shengliang Guan 已提交
104
    pWrapper->procType = DND_PROC_SINGLE;
S
shm  
Shengliang Guan 已提交
105
    taosInitRWLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
106 107
  }

S
shm  
Shengliang Guan 已提交
108
  if (dndInitMsgHandle(pDnode) != 0) {
S
Shengliang Guan 已提交
109
    dError("failed to init msg handles since %s", terrstr());
S
shm  
Shengliang Guan 已提交
110 111 112
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
113 114
  if (dndReadShmFile(pDnode) != 0) {
    dError("failed to read shm file since %s", terrstr());
S
shm  
Shengliang Guan 已提交
115 116 117
    goto _OVER;
  }

S
shm  
Shengliang Guan 已提交
118 119 120
  SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
  tmsgSetDefaultMsgCb(&msgCb);

S
shm  
Shengliang Guan 已提交
121
  dInfo("dnode is created, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
122 123 124 125
  code = 0;

_OVER:
  if (code != 0 && pDnode) {
S
shm  
Shengliang Guan 已提交
126
    dndClearVars(pDnode);
127
    pDnode = NULL;
S
shm  
Shengliang Guan 已提交
128
    dError("failed to create dnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
129 130 131 132 133 134 135 136
  }

  return pDnode;
}

void dndClose(SDnode *pDnode) {
  if (pDnode == NULL) return;

S
Shengliang Guan 已提交
137
  for (EDndNodeType n = 0; n < NODE_END; ++n) {
S
shm  
Shengliang Guan 已提交
138 139
    SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
    dndCloseNode(pWrapper);
S
shm  
Shengliang Guan 已提交
140 141
  }

S
shm  
Shengliang Guan 已提交
142
  dndClearVars(pDnode);
S
shm  
Shengliang Guan 已提交
143
  dInfo("dnode is closed, data:%p", pDnode);
S
shm  
Shengliang Guan 已提交
144 145 146
}

void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
S
shm  
Shengliang Guan 已提交
147 148 149
  if (event == DND_EVENT_STOP) {
    pDnode->event = event;
  }
S
shm  
Shengliang Guan 已提交
150
}
S
shm  
Shengliang Guan 已提交
151

S
Shengliang Guan 已提交
152
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
S
shm  
Shengliang Guan 已提交
153
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
S
shm  
Shengliang Guan 已提交
154 155 156 157 158 159 160
  SMgmtWrapper *pRetWrapper = pWrapper;

  taosRLockLatch(&pWrapper->latch);
  if (pWrapper->deployed) {
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
    dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
  } else {
161
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
S
shm  
Shengliang Guan 已提交
162 163 164 165 166 167 168
    pRetWrapper = NULL;
  }
  taosRUnLockLatch(&pWrapper->latch);

  return pRetWrapper;
}

S
shm  
Shengliang Guan 已提交
169 170 171 172
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
  int32_t code = 0;

  taosRLockLatch(&pWrapper->latch);
S
Shengliang Guan 已提交
173
  if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
S
shm  
Shengliang Guan 已提交
174 175 176
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
    dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
  } else {
177
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
S
shm  
Shengliang Guan 已提交
178 179 180 181 182 183 184
    code = -1;
  }
  taosRUnLockLatch(&pWrapper->latch);

  return code;
}

S
shm  
Shengliang Guan 已提交
185 186 187 188 189 190 191
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
  if (pWrapper == NULL) return;

  taosRLockLatch(&pWrapper->latch);
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
  taosRUnLockLatch(&pWrapper->latch);
  dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
192 193 194 195 196 197 198
}

void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
  pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
  pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}

S
Shengliang Guan 已提交
199 200 201 202 203 204
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
  SStartupReq *pStartup = &pDnode->startup;
  tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
  tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
  pStartup->finished = 0;
}
S
Shengliang Guan 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220

static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
  memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
  pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}

void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
  dDebug("startup req is received");
  SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
  dndGetStartup(pDnode, pStartup);

  dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
  SRpcMsg rpcRsp = {
      .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
  rpcSendResponse(&rpcRsp);
}