dmMgmt.c 7.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
shm  
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmMgmt.h"
S
Shengliang Guan 已提交
18
#include "dmNodes.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index.h"
S
Shengliang Guan 已提交
20
#include "qworker.h"
S
Shengliang Guan 已提交
21

22
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
S
Shengliang Guan 已提交
23
  SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
S
Shengliang Guan 已提交
24 25

  bool    required = false;
S
Shengliang Guan 已提交
26
  int32_t code = (*pWrapper->func.requiredFp)(&input, &required);
S
Shengliang Guan 已提交
27 28
  if (!required) {
    dDebug("node:%s, does not require startup", pWrapper->name);
29
  } else {
S
Shengliang Guan 已提交
30 31 32
    dDebug("node:%s, required to startup", pWrapper->name);
  }

S
Shengliang Guan 已提交
33 34
  return required;
}
S
Shengliang Guan 已提交
35

36
static int32_t dmInitVars(SDnode *pDnode) {
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44
  SDnodeData *pData = &pDnode->data;
  pData->dnodeId = 0;
  pData->clusterId = 0;
  pData->dnodeVer = 0;
  pData->updateTime = 0;
  pData->rebootTime = taosGetTimestampMs();
  pData->dropped = 0;
  pData->stopped = 0;
S
shm  
Shengliang Guan 已提交
45

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
  pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pData->dnodeHash == NULL) {
    dError("failed to init dnode hash");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (dmReadEps(pData) != 0) {
    dError("failed to read file since %s", terrstr());
    return -1;
  }

  if (pData->dropped) {
    dError("dnode will not start since its already dropped");
    return -1;
  }

63
  taosThreadRwlockInit(&pData->lock, NULL);
64
  taosThreadMutexInit(&pDnode->mutex, NULL);
S
shm  
Shengliang Guan 已提交
65 66 67
  return 0;
}

S
Shengliang Guan 已提交
68
static void dmClearVars(SDnode *pDnode) {
S
Shengliang Guan 已提交
69 70 71
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
    taosMemoryFreeClear(pWrapper->path);
72
    taosThreadRwlockDestroy(&pWrapper->lock);
S
Shengliang Guan 已提交
73 74 75 76 77 78 79
  }
  if (pDnode->lockfile != NULL) {
    taosUnLockFile(pDnode->lockfile);
    taosCloseFile(&pDnode->lockfile);
    pDnode->lockfile = NULL;
  }

S
Shengliang Guan 已提交
80
  SDnodeData *pData = &pDnode->data;
81
  taosThreadRwlockWrlock(&pData->lock);
S
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
  if (pData->dnodeEps != NULL) {
    taosArrayDestroy(pData->dnodeEps);
    pData->dnodeEps = NULL;
  }
  if (pData->dnodeHash != NULL) {
    taosHashCleanup(pData->dnodeHash);
    pData->dnodeHash = NULL;
  }
90
  taosThreadRwlockUnlock(&pData->lock);
S
Shengliang Guan 已提交
91

92
  taosThreadRwlockDestroy(&pData->lock);
S
Shengliang Guan 已提交
93 94
  taosThreadMutexDestroy(&pDnode->mutex);
  memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
S
shm  
Shengliang Guan 已提交
95 96
}

97
int32_t dmInitDnode(SDnode *pDnode) {
S
Shengliang Guan 已提交
98
  dDebug("start to create dnode");
S
shm  
Shengliang Guan 已提交
99
  int32_t code = -1;
S
Shengliang Guan 已提交
100
  char    path[PATH_MAX + 100] = {0};
S
shm  
Shengliang Guan 已提交
101

102
  if (dmInitVars(pDnode) != 0) {
S
Shengliang Guan 已提交
103 104 105
    goto _OVER;
  }

S
Shengliang Guan 已提交
106 107 108 109 110
  pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
  pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
  pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
  pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
  pDnode->wrappers[SNODE].func = smGetMgmtFunc();
S
shm  
Shengliang Guan 已提交
111

S
Shengliang 已提交
112 113
  for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
    SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
114
    pWrapper->pDnode = pDnode;
S
Shengliang Guan 已提交
115
    pWrapper->name = dmNodeName(ntype);
S
Shengliang Guan 已提交
116
    pWrapper->ntype = ntype;
117
    taosThreadRwlockInit(&pWrapper->lock, NULL);
S
Shengliang Guan 已提交
118

119
    snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
S
Shengliang Guan 已提交
120
    pWrapper->path = strdup(path);
S
shm  
Shengliang Guan 已提交
121 122 123 124 125
    if (pWrapper->path == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _OVER;
    }

126
    pWrapper->required = dmRequireNode(pDnode, pWrapper);
S
shm  
Shengliang Guan 已提交
127 128
  }

S
Shengliang Guan 已提交
129
  if (dmInitMsgHandle(pDnode) != 0) {
S
Shengliang Guan 已提交
130
    dError("failed to init msg handles since %s", terrstr());
S
shm  
Shengliang Guan 已提交
131 132 133
    goto _OVER;
  }

134 135 136 137
  pDnode->lockfile = dmCheckRunning(tsDataDir);
  if (pDnode->lockfile == NULL) {
    goto _OVER;
  }
S
Shengliang Guan 已提交
138

139 140 141
  if (dmInitServer(pDnode) != 0) {
    dError("failed to init transport since %s", terrstr());
    goto _OVER;
S
Shengliang Guan 已提交
142 143
  }

S
Shengliang Guan 已提交
144 145 146 147
  if (dmInitClient(pDnode) != 0) {
    goto _OVER;
  }

dengyihao's avatar
dengyihao 已提交
148 149
  indexInit(tsNumOfCommitThreads);

150
  dmReportStartup("dnode-transport", "initialized");
S
Shengliang Guan 已提交
151
  dDebug("dnode is created, ptr:%p", pDnode);
S
shm  
Shengliang Guan 已提交
152 153 154
  code = 0;

_OVER:
S
Shengliang Guan 已提交
155
  if (code != 0 && pDnode != NULL) {
S
Shengliang Guan 已提交
156
    dmClearVars(pDnode);
157
    pDnode = NULL;
S
shm  
Shengliang Guan 已提交
158
    dError("failed to create dnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
159 160
  }

161
  return code;
S
shm  
Shengliang Guan 已提交
162 163
}

164
void dmCleanupDnode(SDnode *pDnode) {
S
shm  
Shengliang Guan 已提交
165
  if (pDnode == NULL) return;
S
Shengliang Guan 已提交
166 167 168

  dmCleanupClient(pDnode);
  dmCleanupServer(pDnode);
S
Shengliang Guan 已提交
169
  dmClearVars(pDnode);
dengyihao's avatar
dengyihao 已提交
170
  rpcCleanup();
dengyihao's avatar
dengyihao 已提交
171
  indexCleanup();
172
  taosConvDestroy();
S
Shengliang Guan 已提交
173
  dDebug("dnode is closed, ptr:%p", pDnode);
S
shm  
Shengliang Guan 已提交
174
}
S
Shengliang Guan 已提交
175 176 177 178 179 180 181 182 183 184 185 186

void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
  if (pDnode->status != status) {
    dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
    pDnode->status = status;
  }
}

SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
  SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
  SMgmtWrapper *pRetWrapper = pWrapper;

187
  taosThreadRwlockRdlock(&pWrapper->lock);
S
Shengliang Guan 已提交
188 189
  if (pWrapper->deployed) {
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
S
Shengliang Guan 已提交
190
    // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
191 192 193
  } else {
    pRetWrapper = NULL;
  }
194
  taosThreadRwlockUnlock(&pWrapper->lock);
S
Shengliang Guan 已提交
195 196 197 198 199 200 201

  return pRetWrapper;
}

int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
  int32_t code = 0;

202
  taosThreadRwlockRdlock(&pWrapper->lock);
203
  if (pWrapper->deployed) {
S
Shengliang Guan 已提交
204
    int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
S
Shengliang Guan 已提交
205
    // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
206
  } else {
207 208 209 210 211 212 213 214 215 216
    switch (pWrapper->ntype) {
      case MNODE:
        terrno = TSDB_CODE_MNODE_NOT_FOUND;
        break;
      case QNODE:
        terrno = TSDB_CODE_QNODE_NOT_FOUND;
        break;
      case SNODE:
        terrno = TSDB_CODE_SNODE_NOT_FOUND;
        break;
dengyihao's avatar
dengyihao 已提交
217 218 219
      case VNODE:
        terrno = TSDB_CODE_VND_STOPPED;
        break;
220 221 222 223
      default:
        terrno = TSDB_CODE_APP_IS_STOPPING;
        break;
    }
S
Shengliang Guan 已提交
224 225
    code = -1;
  }
226
  taosThreadRwlockUnlock(&pWrapper->lock);
S
Shengliang Guan 已提交
227 228 229 230 231 232 233

  return code;
}

void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
  if (pWrapper == NULL) return;

234
  taosThreadRwlockRdlock(&pWrapper->lock);
S
Shengliang Guan 已提交
235
  int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
236
  taosThreadRwlockUnlock(&pWrapper->lock);
S
Shengliang Guan 已提交
237
  // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
S
Shengliang Guan 已提交
238 239
}

240
static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
S
Shengliang Guan 已提交
241
  SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt;
S
Shengliang Guan 已提交
242 243 244 245 246
  pStatus->details[0] = 0;

  if (pDnode->status == DND_STAT_INIT) {
    pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
    snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
247
  } else if (pDnode->status == DND_STAT_STOPPED) {
S
Shengliang Guan 已提交
248
    pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
249 250
  } else {
    pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
S
Shengliang Guan 已提交
251 252 253
  }
}

S
Shengliang Guan 已提交
254
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
255
  dDebug("msg:%p, net test req will be processed", pMsg);
S
Shengliang Guan 已提交
256 257

  SRpcMsg rsp = {.info = pMsg->info};
S
Shengliang Guan 已提交
258
  rsp.pCont = rpcMallocCont(pMsg->contLen);
S
Shengliang Guan 已提交
259 260 261
  if (rsp.pCont == NULL) {
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
S
Shengliang Guan 已提交
262
    rsp.contLen = pMsg->contLen;
S
Shengliang Guan 已提交
263
  }
S
Shengliang Guan 已提交
264

S
Shengliang Guan 已提交
265
  rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
266
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
267 268
}

S
Shengliang Guan 已提交
269
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
270
  dDebug("msg:%p, server startup status req will be processed", pMsg);
S
Shengliang Guan 已提交
271

S
Shengliang Guan 已提交
272
  SServerStatusRsp statusRsp = {0};
273
  dmGetServerStartupStatus(pDnode, &statusRsp);
S
Shengliang Guan 已提交
274

S
Shengliang Guan 已提交
275 276 277 278 279 280 281 282 283 284
  SRpcMsg rsp = {.info = pMsg->info};
  int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
  if (contLen < 0) {
    rsp.code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    rsp.pCont = rpcMallocCont(contLen);
    if (rsp.pCont != NULL) {
      tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp);
      rsp.contLen = contLen;
    }
S
Shengliang Guan 已提交
285 286
  }

S
Shengliang Guan 已提交
287 288
  rpcSendResponse(&rsp);
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
289
}