vnodeMain.c 16.8 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "thash.h"
18 19
#include "tthread.h"
#include "vnodeFile.h"
20 21 22 23 24
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"

25 26 27 28 29 30 31 32 33
typedef enum _VN_STATUS {
  TAOS_VN_STATUS_INIT = 0,
  TAOS_VN_STATUS_READY = 1,
  TAOS_VN_STATUS_CLOSING = 2,
  TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;

char *vnodeStatus[] = {"init", "ready", "closing", "updating"};

34
typedef struct {
35 36 37 38 39 40
  pthread_t *threadId;
  int32_t    threadIndex;
  int32_t    failed;
  int32_t    opened;
  int32_t    vnodeNum;
  int32_t   *vnodeList;
41 42 43 44 45 46
} SOpenVnodeThread;

static struct {
  SHashObj *hash;
  int32_t   openVnodes;
  int32_t   totalVnodes;
47
} tsVnode;
48

49 50 51 52 53
static bool vnodeSetInitStatus(SVnode *pVnode) {
  pthread_mutex_lock(&pVnode->statusMutex);
  pVnode->status = TAOS_VN_STATUS_INIT;
  pthread_mutex_unlock(&pVnode->statusMutex);
  return true;
54 55
}

56 57 58
static bool vnodeSetReadyStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
59

60 61 62
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_UPDATING) {
    pVnode->status = TAOS_VN_STATUS_READY;
    set = true;
63 64
  }

65 66
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
67 68
}

69 70 71
static bool vnodeSetUpdatingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
72

73 74 75
  if (pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_UPDATING;
    set = true;
76
  }
J
Jeff Tao 已提交
77

78 79 80
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
81

82 83 84
static bool vnodeSetClosingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85

86 87 88
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_CLOSING;
    set = true;
89
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90

91 92 93
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
94

95 96 97
static bool vnodeInStatus(SVnode *pVnode, EVnodeStatus status) {
  bool in = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98

99 100
  if (pVnode->status == status) {
    in = true;
101 102
  }

103 104
  pthread_mutex_unlock(&pVnode->statusMutex);
  return in;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
}

107 108 109
static void vnodeDestroyVnode(SVnode *pVnode) {
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
110

111 112
  if (pVnode->pQuery) {
    // todo
113 114
  }

115 116
  if (pVnode->pMeta) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117
  }
S
TD-1746  
Shengliang Guan 已提交
118

119 120
  if (pVnode->pTsdb) {
    // todo
Y
yihaoDeng 已提交
121
  }
S
TD-3323  
Shengliang Guan 已提交
122

123 124
  if (pVnode->pTQ) {
    // todo
125
  }
S
slguan 已提交
126

127 128
  if (pVnode->pWal) {
    // todo
129
  }
S
slguan 已提交
130

131 132
  if (pVnode->allocator) {
    // todo
S
TD-1388  
Shengliang Guan 已提交
133 134
  }

135 136 137
  if (pVnode->pWriteQ) {
    vnodeFreeWriteQueue(pVnode->pWriteQ);
    pVnode->pWriteQ = NULL;
138
  }
139

140 141 142
  if (pVnode->pQueryQ) {
    vnodeFreeQueryQueue(pVnode->pQueryQ);
    pVnode->pQueryQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
143 144
  }

145 146 147
  if (pVnode->pFetchQ) {
    vnodeFreeFetchQueue(pVnode->pFetchQ);
    pVnode->pFetchQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
148 149
  }

150 151
  if (pVnode->dropped) {
    // todo
152 153
  }

154 155 156
  pthread_mutex_destroy(&pVnode->statusMutex);
  free(pVnode);
}
157

158 159 160 161
static void vnodeCleanupVnode(SVnode *pVnode) {
  vnodeSetClosingStatus(pVnode);
  taosHashRemove(tsVnode.hash, &pVnode->vgId, sizeof(int32_t));
  vnodeRelease(pVnode);
162 163
}

164 165
static int32_t vnodeOpenVnode(int32_t vgId) {
  int32_t code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166

167
  SVnode *pVnode = calloc(sizeof(SVnode), 1);
168
  if (pVnode == NULL) {
S
TD-2324  
Shengliang Guan 已提交
169
    vError("vgId:%d, failed to open vnode since no enough memory", vgId);
170 171 172
    return TAOS_SYSTEM_ERROR(errno);
  }

173
  pVnode->vgId = vgId;
174 175 176 177
  pVnode->accessState = TAOS_VN_STATUS_INIT;
  pVnode->status = TSDB_VN_ALL_ACCCESS;
  pVnode->refCount = 1;
  pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
S
TD-2289  
Shengliang Guan 已提交
178
  pthread_mutex_init(&pVnode->statusMutex, NULL);
S
Shengliang Guan 已提交
179

180
  code = vnodeReadCfg(vgId, &pVnode->cfg);
181
  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
182
    vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
183 184
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
S
Shengliang Guan 已提交
185
    return 0;
186
  }
187

188
  code = vnodeReadTerm(vgId, &pVnode->term);
189
  if (code != TSDB_CODE_SUCCESS) {
190 191 192 193
    vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
    return code;
194 195
  }

196 197 198 199 200
  pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode);
  pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode);
  pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode);
  if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) {
    vnodeCleanupVnode(pVnode);
201 202
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
203

204 205 206 207 208
  char path[PATH_MAX + 20];
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  pVnode->pWal = walOpen(path, &pVnode->cfg.wal);
  if (pVnode->pWal == NULL) {
    vnodeCleanupVnode(pVnode);
J
jtao1735 已提交
209 210 211
    return terrno;
  }

212 213
  vDebug("vgId:%d, vnode is opened", pVnode->vgId);
  taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));
214

215 216 217
  vnodeSetReadyStatus(pVnode);
  return TSDB_CODE_SUCCESS;
}
218

219 220 221
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
  int32_t code = 0;
  char    path[PATH_MAX + 20] = {0};
222

223 224 225 226 227
  snprintf(path, sizeof(path), "%s/vnode%d", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
228 229
  }

230 231 232 233 234
  snprintf(path, sizeof(path), "%s/vnode%d/cfg", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
S
TD-1696  
Shengliang Guan 已提交
235
  }
J
jtao1735 已提交
236

237 238 239 240
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
S
TD-2087  
Shengliang Guan 已提交
241 242 243
    return code;
  }

244 245 246 247 248
  snprintf(path, sizeof(path), "%s/vnode%d/tq", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
249 250
  }

251 252 253 254 255
  snprintf(path, sizeof(path), "%s/vnode%d/tsdb", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
256
  }
257

258 259 260 261 262 263
  snprintf(path, sizeof(path), "%s/vnode%d/meta", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
  }
264

265 266 267 268 269
  code = vnodeWriteCfg(vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to save vnode cfg since %s", vgId, tstrerror(code));
    return code;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
270

271
  return vnodeOpenVnode(vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273
}

274
int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
S
TD-2289  
Shengliang Guan 已提交
275 276
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
277

278 279 280 281
  bool walChanged = (memcmp(&pCfg->wal, &pVnode->cfg.wal, sizeof(SWalCfg)) != 0);
  bool tsdbChanged = (memcmp(&pCfg->tsdb, &pVnode->cfg.tsdb, sizeof(STsdbCfg)) != 0);
  bool metaChanged = (memcmp(&pCfg->meta, &pVnode->cfg.meta, sizeof(SMetaCfg)) != 0);
  bool syncChanged = (memcmp(&pCfg->sync, &pVnode->cfg.sync, sizeof(SSyncCluster)) != 0);
282

283 284 285 286
  if (!walChanged && !tsdbChanged && !metaChanged && !syncChanged) {
    vDebug("vgId:%d, nothing changed", vgId);
    vnodeRelease(pVnode);
    return code;
S
TD-1894  
Shengliang Guan 已提交
287 288
  }

289 290 291 292 293
  code = vnodeWriteCfg(pVnode->vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to write alter msg to file since %s", vgId, tstrerror(code));
    vnodeRelease(pVnode);
    return code;
S
TD-1746  
Shengliang Guan 已提交
294
  }
295

296
  pVnode->cfg = *pCfg;
B
Bomin Zhang 已提交
297

298 299
  if (walChanged) {
    code = walAlter(pVnode->pWal, &pVnode->cfg.wal);
S
TD-2065  
Shengliang Guan 已提交
300
    if (code != 0) {
301 302 303
      vDebug("vgId:%d, failed to alter wal since %s", vgId, tstrerror(code));
      vnodeRelease(pVnode);
      return code;
S
TD-2065  
Shengliang Guan 已提交
304
    }
S
TD-2393  
Shengliang Guan 已提交
305 306
  }

307 308
  if (tsdbChanged) {
    // todo
S
TD-1746  
Shengliang Guan 已提交
309 310
  }

311 312
  if (metaChanged) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
313 314
  }

315 316
  if (syncChanged) {
    // todo
317 318
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319
  vnodeRelease(pVnode);
320
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321
}
H
hzcheng 已提交
322

323 324 325 326 327
int32_t vnodeDropVnode(SVnode *pVnode) {
  if (pVnode->cfg.dropped) {
    vInfo("vgId:%d, already set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeRelease(pVnode);
    return TSDB_CODE_SUCCESS;
328 329
  }

330 331 332 333 334 335 336 337
  pVnode->cfg.dropped = 1;
  int32_t code = vnodeWriteCfg(pVnode->vgId, &pVnode->cfg);
  if (code == 0) {
    vInfo("vgId:%d, set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeCleanupVnode(pVnode);
  } else {
    vError("vgId:%d, failed to set drop flag since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338
  }
339

340 341 342
  vnodeRelease(pVnode);
  return code;
}
343

344 345 346
int32_t vnodeSyncVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
}
347

348 349
int32_t vnodeCompactVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
350
}
351

352
static void *vnodeOpenVnodeFunc(void *param) {
353 354 355 356 357 358 359 360 361 362
  SOpenVnodeThread *pThread = param;

  vDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("vnodeOpenVnode");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    int32_t vgId = pThread->vnodeList[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId,
363
             tsVnode.openVnodes, tsVnode.totalVnodes);
364 365
    // (*vnodeInst()->fp.ReportStartup)("open-vnodes", stepDesc);

366
    if (vnodeOpenVnode(vgId) < 0) {
367 368 369 370 371 372 373
      vError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      vDebug("vgId:%d, is opened by thread:%d", vgId, pThread->threadIndex);
      pThread->opened++;
    }

374
    atomic_add_fetch_32(&tsVnode.openVnodes, 1);
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
  }

  vDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t vnodeGetVnodeListFromDisk(int32_t vnodeList[], int32_t *numOfVnodes) {
#if 0
  DIR *dir = opendir(tsVnodeDir);
  if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;

  *numOfVnodes = 0;
  struct dirent *de = NULL;
  while ((de = readdir(dir)) != NULL) {
    if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
    if (de->d_type & DT_DIR) {
      if (strncmp("vnode", de->d_name, 5) != 0) continue;
      int32_t vnode = atoi(de->d_name + 5);
      if (vnode == 0) continue;

      (*numOfVnodes)++;

      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
        closedir(dir);
        return TSDB_CODE_DND_TOO_MANY_VNODES;
      } else {
        vnodeList[*numOfVnodes - 1] = vnode;
      }
    }
  }
  closedir(dir);
#endif
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeOpenVnodes() {
  int32_t vnodeList[TSDB_MAX_VNODES] = {0};
  int32_t numOfVnodes = 0;
  int32_t status = vnodeGetVnodeListFromDisk(vnodeList, &numOfVnodes);

  if (status != TSDB_CODE_SUCCESS) {
    vInfo("failed to get vnode list from disk since code:%d", status);
    return status;
  }

422
  tsVnode.totalVnodes = numOfVnodes;
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444

  int32_t threadNum = tsNumOfCores;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t           t = v % threadNum;
    SOpenVnodeThread *pThread = &threads[t];
    pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
  }

  vInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

445 446
    pThread->threadId = taosCreateThread(vnodeOpenVnodeFunc, pThread);
    if (pThread->threadId == NULL) {
447 448 449 450 451 452 453 454
      vError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
  }

  int32_t openVnodes = 0;
  int32_t failedVnodes = 0;
  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
455 456 457
    taosDestoryThread(pThread->threadId);
    pThread->threadId = NULL;

458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    openVnodes += pThread->opened;
    failedVnodes += pThread->failed;
    free(pThread->vnodeList);
  }

  free(threads);
  vInfo("there are total vnodes:%d, opened:%d", numOfVnodes, openVnodes);

  if (failedVnodes != 0) {
    vError("there are total vnodes:%d, failed:%d", numOfVnodes, failedVnodes);
    return -1;
  }

  return TSDB_CODE_SUCCESS;
}

474 475
static int32_t vnodeGetVnodeList(SVnode *vnodeList[], int32_t *numOfVnodes) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
476 477 478 479 480 481 482 483
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      (*numOfVnodes)++;
      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
        continue;
      } else {
484
        vnodeList[*numOfVnodes - 1] = (*pVnode);
485 486 487
      }
    }

488
    pIter = taosHashIterate(tsVnode.hash, pIter);
489 490 491 492 493 494
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeCleanupVnodes() {
495
  SVnode* vnodeList[TSDB_MAX_VNODES] = {0};
496 497 498 499 500 501 502 503 504 505
  int32_t numOfVnodes = 0;

  int32_t code = vnodeGetVnodeList(vnodeList, &numOfVnodes);

  if (code != TSDB_CODE_SUCCESS) {
    vInfo("failed to get dnode list since code %d", code);
    return;
  }

  for (int32_t i = 0; i < numOfVnodes; ++i) {
506
    vnodeCleanupVnode(vnodeList[i]);
507 508 509 510 511
  }

  vInfo("total vnodes:%d are all closed", numOfVnodes);
}

512 513 514 515 516 517 518 519 520 521
static void vnodeIncRef(void *ptNode) {
  assert(ptNode != NULL);

  SVnode **ppVnode = (SVnode **)ptNode;
  assert(ppVnode);
  assert(*ppVnode);

  SVnode *pVnode = *ppVnode;
  atomic_add_fetch_32(&pVnode->refCount, 1);
  vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
522 523
}

524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
SVnode *vnodeAcquireInAllState(int32_t vgId) {
  SVnode *pVnode = NULL;

  // taosHashGetClone(tsVnode.hash, &vgId, sizeof(int32_t), vnodeIncRef, (void*)&pVnode);
  if (pVnode == NULL) {
    vDebug("vgId:%d, can't accquire since not exist", vgId);
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
    return NULL;
  }

  return pVnode;
}

SVnode *vnodeAcquire(int32_t vgId) {
  SVnode *pVnode = vnodeAcquireInAllState(vgId);
  if (pVnode == NULL) return NULL;

  if (vnodeInStatus(pVnode, TAOS_VN_STATUS_READY)) {
    return pVnode;
543
  } else {
544 545 546 547
    vDebug("vgId:%d, can't accquire since not in ready status", vgId);
    vnodeRelease(pVnode);
    terrno = TSDB_CODE_VND_INVALID_TSDB_STATE;
    return NULL;
548 549 550
  }
}

551 552
void vnodeRelease(SVnode *pVnode) {
  if (pVnode == NULL) return;
553

554 555 556 557 558 559 560 561 562 563 564
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  int32_t vgId = pVnode->vgId;

  vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
  assert(refCount >= 0);

  if (refCount <= 0) {
    vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
    vnodeDestroyVnode(pVnode);
    int32_t count = taosHashGetSize(tsVnode.hash);
    vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
565
  }
566 567 568 569 570
}

int32_t vnodeInitMain() {
  tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (tsVnode.hash == NULL) {
571 572 573 574 575 576 577 578 579 580
    vError("failed to init vnode mgmt");
    return -1;
  }

  vInfo("vnode main is initialized");
  return vnodeOpenVnodes();
}

void vnodeCleanupMain() {
  vnodeCleanupVnodes();
581 582
  taosHashCleanup(tsVnode.hash);
  tsVnode.hash = NULL;
583 584 585 586 587 588 589 590 591
}

static void vnodeBuildVloadMsg(SVnode *pVnode, SStatusMsg *pStatus) {
  int64_t totalStorage = 0;
  int64_t compStorage = 0;
  int64_t pointsWritten = 0;

  if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;

592 593 594
  // if (pVnode->tsdb) {
  //   tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
  // }
595 596 597 598 599 600 601 602 603 604

  SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
  pLoad->vgId = htonl(pVnode->vgId);
  pLoad->totalStorage = htobe64(totalStorage);
  pLoad->compStorage = htobe64(compStorage);
  pLoad->pointsWritten = htobe64(pointsWritten);
  pLoad->status = pVnode->status;
  pLoad->role = pVnode->role;
}

605 606
void vnodeGetStatus(SStatusMsg *pStatus) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
607 608 609 610 611
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      vnodeBuildVloadMsg(*pVnode, pStatus);
    }
612
    pIter = taosHashIterate(tsVnode.hash, pIter);
613 614 615
  }
}

616
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
617 618
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    pAccess[i].vgId = htonl(pAccess[i].vgId);
619
    SVnode *pVnode = vnodeAcquire(pAccess[i].vgId);
620 621 622 623 624 625 626 627 628
    if (pVnode != NULL) {
      pVnode->accessState = pAccess[i].accessState;
      if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
        vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
      }
      vnodeRelease(pVnode);
    }
  }
}