vnodeMain.c 17.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "thash.h"
18 19
#include "tthread.h"
#include "vnodeFile.h"
20 21 22 23 24
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"

25 26 27 28 29 30 31 32 33
typedef enum _VN_STATUS {
  TAOS_VN_STATUS_INIT = 0,
  TAOS_VN_STATUS_READY = 1,
  TAOS_VN_STATUS_CLOSING = 2,
  TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;

char *vnodeStatus[] = {"init", "ready", "closing", "updating"};

34
typedef struct {
35 36 37 38 39 40
  pthread_t *threadId;
  int32_t    threadIndex;
  int32_t    failed;
  int32_t    opened;
  int32_t    vnodeNum;
  int32_t   *vnodeList;
41 42 43 44 45 46
} SOpenVnodeThread;

static struct {
  SHashObj *hash;
  int32_t   openVnodes;
  int32_t   totalVnodes;
47
} tsVnode;
48

49 50 51 52 53
static bool vnodeSetInitStatus(SVnode *pVnode) {
  pthread_mutex_lock(&pVnode->statusMutex);
  pVnode->status = TAOS_VN_STATUS_INIT;
  pthread_mutex_unlock(&pVnode->statusMutex);
  return true;
54 55
}

56 57 58
static bool vnodeSetReadyStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
59

60 61 62
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_UPDATING) {
    pVnode->status = TAOS_VN_STATUS_READY;
    set = true;
63 64
  }

65 66
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
67 68
}

69 70 71
static bool vnodeSetUpdatingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
72

73 74 75
  if (pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_UPDATING;
    set = true;
76
  }
J
Jeff Tao 已提交
77

78 79 80
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
81

82 83 84
static bool vnodeSetClosingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85

86 87 88
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_CLOSING;
    set = true;
89
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90

91 92 93
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
94

95 96 97
static bool vnodeInStatus(SVnode *pVnode, EVnodeStatus status) {
  bool in = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98

99 100
  if (pVnode->status == status) {
    in = true;
101 102
  }

103 104
  pthread_mutex_unlock(&pVnode->statusMutex);
  return in;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
}

107 108 109
static void vnodeDestroyVnode(SVnode *pVnode) {
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
110

S
Shengliang Guan 已提交
111 112 113 114 115
  if (pVnode->pSync != NULL) {
    syncStop(pVnode->pSync);
    pVnode->pSync = NULL;
  }

116 117
  if (pVnode->pQuery) {
    // todo
118 119
  }

120 121
  if (pVnode->pMeta) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
122
  }
S
TD-1746  
Shengliang Guan 已提交
123

124 125
  if (pVnode->pTsdb) {
    // todo
Y
yihaoDeng 已提交
126
  }
S
TD-3323  
Shengliang Guan 已提交
127

128 129
  if (pVnode->pTQ) {
    // todo
130
  }
S
slguan 已提交
131

132 133
  if (pVnode->pWal) {
    // todo
134
  }
S
slguan 已提交
135

136 137
  if (pVnode->allocator) {
    // todo
S
TD-1388  
Shengliang Guan 已提交
138 139
  }

140 141 142
  if (pVnode->pWriteQ) {
    vnodeFreeWriteQueue(pVnode->pWriteQ);
    pVnode->pWriteQ = NULL;
143
  }
144

145 146 147
  if (pVnode->pQueryQ) {
    vnodeFreeQueryQueue(pVnode->pQueryQ);
    pVnode->pQueryQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
148 149
  }

150 151 152
  if (pVnode->pFetchQ) {
    vnodeFreeFetchQueue(pVnode->pFetchQ);
    pVnode->pFetchQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
153 154
  }

155 156
  if (pVnode->dropped) {
    // todo
157 158
  }

159 160 161
  pthread_mutex_destroy(&pVnode->statusMutex);
  free(pVnode);
}
162

163 164 165 166
static void vnodeCleanupVnode(SVnode *pVnode) {
  vnodeSetClosingStatus(pVnode);
  taosHashRemove(tsVnode.hash, &pVnode->vgId, sizeof(int32_t));
  vnodeRelease(pVnode);
167 168
}

169 170
static int32_t vnodeOpenVnode(int32_t vgId) {
  int32_t code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171

172
  SVnode *pVnode = calloc(sizeof(SVnode), 1);
173
  if (pVnode == NULL) {
S
TD-2324  
Shengliang Guan 已提交
174
    vError("vgId:%d, failed to open vnode since no enough memory", vgId);
175 176 177
    return TAOS_SYSTEM_ERROR(errno);
  }

178
  pVnode->vgId = vgId;
179 180 181 182
  pVnode->accessState = TAOS_VN_STATUS_INIT;
  pVnode->status = TSDB_VN_ALL_ACCCESS;
  pVnode->refCount = 1;
  pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
S
TD-2289  
Shengliang Guan 已提交
183
  pthread_mutex_init(&pVnode->statusMutex, NULL);
S
Shengliang Guan 已提交
184

S
Shengliang Guan 已提交
185 186 187
  vDebug("vgId:%d, vnode is opened", pVnode->vgId);
  taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));

188
  code = vnodeReadCfg(vgId, &pVnode->cfg);
189
  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
190
    vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
191 192
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
S
Shengliang Guan 已提交
193
    return 0;
194
  }
195

196
  code = vnodeReadTerm(vgId, &pVnode->term);
197
  if (code != TSDB_CODE_SUCCESS) {
198 199 200 201
    vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
    return code;
202 203
  }

204 205 206 207 208
  pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode);
  pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode);
  pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode);
  if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) {
    vnodeCleanupVnode(pVnode);
209 210
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211

212 213 214 215 216
  char path[PATH_MAX + 20];
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  pVnode->pWal = walOpen(path, &pVnode->cfg.wal);
  if (pVnode->pWal == NULL) {
    vnodeCleanupVnode(pVnode);
J
jtao1735 已提交
217 218 219
    return terrno;
  }

S
Shengliang Guan 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
  // create sync node
  SSyncInfo syncInfo = {0};
  syncInfo.vgId = vgId;
  syncInfo.snapshotIndex = 0;      // todo, from tsdb
  memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
  syncInfo.fsm.pData = pVnode;
  syncInfo.fsm.applyLog = NULL;
  syncInfo.fsm.onClusterChanged = NULL;
  syncInfo.fsm.getSnapshot = NULL;
  syncInfo.fsm.applySnapshot = NULL;
  syncInfo.fsm.onRestoreDone = NULL;
  syncInfo.fsm.onRollback = NULL;
  syncInfo.logStore.pData = pVnode;
  syncInfo.logStore.logWrite = NULL;
  syncInfo.logStore.logCommit = NULL;
  syncInfo.logStore.logPrune = NULL;
  syncInfo.logStore.logRollback = NULL;
  syncInfo.stateManager.pData = pVnode;
  syncInfo.stateManager.saveServerState = NULL;
  syncInfo.stateManager.readServerState = NULL;
  // syncInfo.stateManager.saveCluster = NULL;
  // syncInfo.stateManager.readCluster = NULL;

  pVnode->pSync = syncStart(&syncInfo);
  if (pVnode->pSync == NULL) {
    vnodeCleanupVnode(pVnode);
    return terrno;
  }
248

249 250 251
  vnodeSetReadyStatus(pVnode);
  return TSDB_CODE_SUCCESS;
}
252

253 254 255
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
  int32_t code = 0;
  char    path[PATH_MAX + 20] = {0};
256

257 258 259 260 261
  snprintf(path, sizeof(path), "%s/vnode%d", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
262 263
  }

264 265 266 267 268
  snprintf(path, sizeof(path), "%s/vnode%d/cfg", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
S
TD-1696  
Shengliang Guan 已提交
269
  }
J
jtao1735 已提交
270

271 272 273 274
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
S
TD-2087  
Shengliang Guan 已提交
275 276 277
    return code;
  }

278 279 280 281 282
  snprintf(path, sizeof(path), "%s/vnode%d/tq", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
283 284
  }

285 286 287 288 289
  snprintf(path, sizeof(path), "%s/vnode%d/tsdb", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
290
  }
291

292 293 294 295 296 297
  snprintf(path, sizeof(path), "%s/vnode%d/meta", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
  }
298

299 300 301 302 303
  code = vnodeWriteCfg(vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to save vnode cfg since %s", vgId, tstrerror(code));
    return code;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304

305
  return vnodeOpenVnode(vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306 307
}

308
int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
S
TD-2289  
Shengliang Guan 已提交
309 310
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
311

312 313 314 315
  bool walChanged = (memcmp(&pCfg->wal, &pVnode->cfg.wal, sizeof(SWalCfg)) != 0);
  bool tsdbChanged = (memcmp(&pCfg->tsdb, &pVnode->cfg.tsdb, sizeof(STsdbCfg)) != 0);
  bool metaChanged = (memcmp(&pCfg->meta, &pVnode->cfg.meta, sizeof(SMetaCfg)) != 0);
  bool syncChanged = (memcmp(&pCfg->sync, &pVnode->cfg.sync, sizeof(SSyncCluster)) != 0);
316

317 318 319 320
  if (!walChanged && !tsdbChanged && !metaChanged && !syncChanged) {
    vDebug("vgId:%d, nothing changed", vgId);
    vnodeRelease(pVnode);
    return code;
S
TD-1894  
Shengliang Guan 已提交
321 322
  }

323 324 325 326 327
  code = vnodeWriteCfg(pVnode->vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to write alter msg to file since %s", vgId, tstrerror(code));
    vnodeRelease(pVnode);
    return code;
S
TD-1746  
Shengliang Guan 已提交
328
  }
329

330
  pVnode->cfg = *pCfg;
B
Bomin Zhang 已提交
331

332 333
  if (walChanged) {
    code = walAlter(pVnode->pWal, &pVnode->cfg.wal);
S
TD-2065  
Shengliang Guan 已提交
334
    if (code != 0) {
335 336 337
      vDebug("vgId:%d, failed to alter wal since %s", vgId, tstrerror(code));
      vnodeRelease(pVnode);
      return code;
S
TD-2065  
Shengliang Guan 已提交
338
    }
S
TD-2393  
Shengliang Guan 已提交
339 340
  }

341 342
  if (tsdbChanged) {
    // todo
S
TD-1746  
Shengliang Guan 已提交
343 344
  }

345 346
  if (metaChanged) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347 348
  }

349
  if (syncChanged) {
S
Shengliang Guan 已提交
350
    syncReconfig(pVnode->pSync, &pVnode->cfg.sync);
351 352
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353
  vnodeRelease(pVnode);
354
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355
}
H
hzcheng 已提交
356

357 358 359 360 361
int32_t vnodeDropVnode(SVnode *pVnode) {
  if (pVnode->cfg.dropped) {
    vInfo("vgId:%d, already set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeRelease(pVnode);
    return TSDB_CODE_SUCCESS;
362 363
  }

364 365 366 367 368 369 370 371
  pVnode->cfg.dropped = 1;
  int32_t code = vnodeWriteCfg(pVnode->vgId, &pVnode->cfg);
  if (code == 0) {
    vInfo("vgId:%d, set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeCleanupVnode(pVnode);
  } else {
    vError("vgId:%d, failed to set drop flag since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372
  }
373

374 375 376
  vnodeRelease(pVnode);
  return code;
}
377

378 379 380
int32_t vnodeSyncVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
}
381

382 383
int32_t vnodeCompactVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
384
}
385

386
static void *vnodeOpenVnodeFunc(void *param) {
387 388 389 390 391 392 393 394 395 396
  SOpenVnodeThread *pThread = param;

  vDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("vnodeOpenVnode");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    int32_t vgId = pThread->vnodeList[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId,
397
             tsVnode.openVnodes, tsVnode.totalVnodes);
398 399
    // (*vnodeInst()->fp.ReportStartup)("open-vnodes", stepDesc);

400
    if (vnodeOpenVnode(vgId) < 0) {
401 402 403 404 405 406 407
      vError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      vDebug("vgId:%d, is opened by thread:%d", vgId, pThread->threadIndex);
      pThread->opened++;
    }

408
    atomic_add_fetch_32(&tsVnode.openVnodes, 1);
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  }

  vDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t vnodeGetVnodeListFromDisk(int32_t vnodeList[], int32_t *numOfVnodes) {
#if 0
  DIR *dir = opendir(tsVnodeDir);
  if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;

  *numOfVnodes = 0;
  struct dirent *de = NULL;
  while ((de = readdir(dir)) != NULL) {
    if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
    if (de->d_type & DT_DIR) {
      if (strncmp("vnode", de->d_name, 5) != 0) continue;
      int32_t vnode = atoi(de->d_name + 5);
      if (vnode == 0) continue;

      (*numOfVnodes)++;

      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
        closedir(dir);
        return TSDB_CODE_DND_TOO_MANY_VNODES;
      } else {
        vnodeList[*numOfVnodes - 1] = vnode;
      }
    }
  }
  closedir(dir);
#endif
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeOpenVnodes() {
  int32_t vnodeList[TSDB_MAX_VNODES] = {0};
  int32_t numOfVnodes = 0;
  int32_t status = vnodeGetVnodeListFromDisk(vnodeList, &numOfVnodes);

  if (status != TSDB_CODE_SUCCESS) {
    vInfo("failed to get vnode list from disk since code:%d", status);
    return status;
  }

456
  tsVnode.totalVnodes = numOfVnodes;
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478

  int32_t threadNum = tsNumOfCores;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t           t = v % threadNum;
    SOpenVnodeThread *pThread = &threads[t];
    pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
  }

  vInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

479 480
    pThread->threadId = taosCreateThread(vnodeOpenVnodeFunc, pThread);
    if (pThread->threadId == NULL) {
481 482 483 484 485 486 487 488
      vError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
  }

  int32_t openVnodes = 0;
  int32_t failedVnodes = 0;
  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
489 490 491
    taosDestoryThread(pThread->threadId);
    pThread->threadId = NULL;

492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
    openVnodes += pThread->opened;
    failedVnodes += pThread->failed;
    free(pThread->vnodeList);
  }

  free(threads);
  vInfo("there are total vnodes:%d, opened:%d", numOfVnodes, openVnodes);

  if (failedVnodes != 0) {
    vError("there are total vnodes:%d, failed:%d", numOfVnodes, failedVnodes);
    return -1;
  }

  return TSDB_CODE_SUCCESS;
}

508 509
static int32_t vnodeGetVnodeList(SVnode *vnodeList[], int32_t *numOfVnodes) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
510 511 512 513 514 515 516 517
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      (*numOfVnodes)++;
      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
        continue;
      } else {
518
        vnodeList[*numOfVnodes - 1] = (*pVnode);
519 520 521
      }
    }

522
    pIter = taosHashIterate(tsVnode.hash, pIter);
523 524 525 526 527 528
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeCleanupVnodes() {
529
  SVnode* vnodeList[TSDB_MAX_VNODES] = {0};
530 531 532 533 534 535 536 537 538 539
  int32_t numOfVnodes = 0;

  int32_t code = vnodeGetVnodeList(vnodeList, &numOfVnodes);

  if (code != TSDB_CODE_SUCCESS) {
    vInfo("failed to get dnode list since code %d", code);
    return;
  }

  for (int32_t i = 0; i < numOfVnodes; ++i) {
540
    vnodeCleanupVnode(vnodeList[i]);
541 542 543 544 545
  }

  vInfo("total vnodes:%d are all closed", numOfVnodes);
}

546 547 548 549 550 551 552 553 554 555
static void vnodeIncRef(void *ptNode) {
  assert(ptNode != NULL);

  SVnode **ppVnode = (SVnode **)ptNode;
  assert(ppVnode);
  assert(*ppVnode);

  SVnode *pVnode = *ppVnode;
  atomic_add_fetch_32(&pVnode->refCount, 1);
  vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
556 557
}

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
SVnode *vnodeAcquireInAllState(int32_t vgId) {
  SVnode *pVnode = NULL;

  // taosHashGetClone(tsVnode.hash, &vgId, sizeof(int32_t), vnodeIncRef, (void*)&pVnode);
  if (pVnode == NULL) {
    vDebug("vgId:%d, can't accquire since not exist", vgId);
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
    return NULL;
  }

  return pVnode;
}

SVnode *vnodeAcquire(int32_t vgId) {
  SVnode *pVnode = vnodeAcquireInAllState(vgId);
  if (pVnode == NULL) return NULL;

  if (vnodeInStatus(pVnode, TAOS_VN_STATUS_READY)) {
    return pVnode;
577
  } else {
578 579 580 581
    vDebug("vgId:%d, can't accquire since not in ready status", vgId);
    vnodeRelease(pVnode);
    terrno = TSDB_CODE_VND_INVALID_TSDB_STATE;
    return NULL;
582 583 584
  }
}

585 586
void vnodeRelease(SVnode *pVnode) {
  if (pVnode == NULL) return;
587

588 589 590 591 592 593 594 595 596 597 598
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  int32_t vgId = pVnode->vgId;

  vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
  assert(refCount >= 0);

  if (refCount <= 0) {
    vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
    vnodeDestroyVnode(pVnode);
    int32_t count = taosHashGetSize(tsVnode.hash);
    vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
599
  }
600 601 602 603 604
}

int32_t vnodeInitMain() {
  tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (tsVnode.hash == NULL) {
605 606 607 608 609 610 611 612 613 614
    vError("failed to init vnode mgmt");
    return -1;
  }

  vInfo("vnode main is initialized");
  return vnodeOpenVnodes();
}

void vnodeCleanupMain() {
  vnodeCleanupVnodes();
615 616
  taosHashCleanup(tsVnode.hash);
  tsVnode.hash = NULL;
617 618 619 620 621 622 623 624 625
}

static void vnodeBuildVloadMsg(SVnode *pVnode, SStatusMsg *pStatus) {
  int64_t totalStorage = 0;
  int64_t compStorage = 0;
  int64_t pointsWritten = 0;

  if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;

626 627 628
  // if (pVnode->tsdb) {
  //   tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
  // }
629 630 631 632 633 634 635 636 637 638

  SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
  pLoad->vgId = htonl(pVnode->vgId);
  pLoad->totalStorage = htobe64(totalStorage);
  pLoad->compStorage = htobe64(compStorage);
  pLoad->pointsWritten = htobe64(pointsWritten);
  pLoad->status = pVnode->status;
  pLoad->role = pVnode->role;
}

639 640
void vnodeGetStatus(SStatusMsg *pStatus) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
641 642 643 644 645
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      vnodeBuildVloadMsg(*pVnode, pStatus);
    }
646
    pIter = taosHashIterate(tsVnode.hash, pIter);
647 648 649
  }
}

650
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
651 652
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    pAccess[i].vgId = htonl(pAccess[i].vgId);
653
    SVnode *pVnode = vnodeAcquire(pAccess[i].vgId);
654 655 656 657 658 659 660 661 662
    if (pVnode != NULL) {
      pVnode->accessState = pAccess[i].accessState;
      if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
        vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
      }
      vnodeRelease(pVnode);
    }
  }
}