vnodeMain.c 20.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "thash.h"
18 19
#include "tthread.h"
#include "vnodeFile.h"
20 21 22 23 24
#include "vnodeMain.h"
#include "vnodeMgmt.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"

25 26 27 28 29 30 31 32 33
typedef enum _VN_STATUS {
  TAOS_VN_STATUS_INIT = 0,
  TAOS_VN_STATUS_READY = 1,
  TAOS_VN_STATUS_CLOSING = 2,
  TAOS_VN_STATUS_UPDATING = 3
} EVnodeStatus;

char *vnodeStatus[] = {"init", "ready", "closing", "updating"};

34
typedef struct {
35 36 37 38 39 40
  pthread_t *threadId;
  int32_t    threadIndex;
  int32_t    failed;
  int32_t    opened;
  int32_t    vnodeNum;
  int32_t   *vnodeList;
41 42 43 44 45 46
} SOpenVnodeThread;

static struct {
  SHashObj *hash;
  int32_t   openVnodes;
  int32_t   totalVnodes;
47
} tsVnode;
48

49 50 51 52 53
static bool vnodeSetInitStatus(SVnode *pVnode) {
  pthread_mutex_lock(&pVnode->statusMutex);
  pVnode->status = TAOS_VN_STATUS_INIT;
  pthread_mutex_unlock(&pVnode->statusMutex);
  return true;
54 55
}

56 57 58
static bool vnodeSetReadyStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
59

60 61 62
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_UPDATING) {
    pVnode->status = TAOS_VN_STATUS_READY;
    set = true;
63 64
  }

65 66
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
67 68
}

69 70 71
static bool vnodeSetUpdatingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
72

73 74 75
  if (pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_UPDATING;
    set = true;
76
  }
J
Jeff Tao 已提交
77

78 79 80
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
81

82 83 84
static bool vnodeSetClosingStatus(SVnode *pVnode) {
  bool set = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85

86 87 88
  if (pVnode->status == TAOS_VN_STATUS_INIT || pVnode->status == TAOS_VN_STATUS_READY) {
    pVnode->status = TAOS_VN_STATUS_CLOSING;
    set = true;
89
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90

91 92 93
  pthread_mutex_unlock(&pVnode->statusMutex);
  return set;
}
94

95 96 97
static bool vnodeInStatus(SVnode *pVnode, EVnodeStatus status) {
  bool in = false;
  pthread_mutex_lock(&pVnode->statusMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98

99 100
  if (pVnode->status == status) {
    in = true;
101 102
  }

103 104
  pthread_mutex_unlock(&pVnode->statusMutex);
  return in;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106
}

107 108 109
static void vnodeDestroyVnode(SVnode *pVnode) {
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
110

S
Shengliang Guan 已提交
111 112 113 114 115
  if (pVnode->pSync != NULL) {
    syncStop(pVnode->pSync);
    pVnode->pSync = NULL;
  }

116 117
  if (pVnode->pQuery) {
    // todo
118 119
  }

120 121
  if (pVnode->pMeta) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
122
  }
S
TD-1746  
Shengliang Guan 已提交
123

124 125
  if (pVnode->pTsdb) {
    // todo
Y
yihaoDeng 已提交
126
  }
S
TD-3323  
Shengliang Guan 已提交
127

128 129
  if (pVnode->pTQ) {
    // todo
130
  }
S
slguan 已提交
131

132
  if (pVnode->pWal) {
S
Shengliang Guan 已提交
133 134
    walClose(pVnode->pWal);
    pVnode->pWal = NULL;
135
  }
S
slguan 已提交
136

137 138
  if (pVnode->allocator) {
    // todo
S
TD-1388  
Shengliang Guan 已提交
139 140
  }

141 142 143
  if (pVnode->pWriteQ) {
    vnodeFreeWriteQueue(pVnode->pWriteQ);
    pVnode->pWriteQ = NULL;
144
  }
145

146 147 148
  if (pVnode->pQueryQ) {
    vnodeFreeQueryQueue(pVnode->pQueryQ);
    pVnode->pQueryQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
149 150
  }

151 152 153
  if (pVnode->pFetchQ) {
    vnodeFreeFetchQueue(pVnode->pFetchQ);
    pVnode->pFetchQ = NULL;
S
TD-2289  
Shengliang Guan 已提交
154 155
  }

156 157
  if (pVnode->dropped) {
    // todo
158 159
  }

160 161 162
  pthread_mutex_destroy(&pVnode->statusMutex);
  free(pVnode);
}
163

164 165 166 167
static void vnodeCleanupVnode(SVnode *pVnode) {
  vnodeSetClosingStatus(pVnode);
  taosHashRemove(tsVnode.hash, &pVnode->vgId, sizeof(int32_t));
  vnodeRelease(pVnode);
168 169
}

S
Shengliang Guan 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) {
  SVnode *pVnode = logStore->pData;  // vnode status can be checked here
  return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len);
}

static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) {
  SVnode *pVnode = logStore->pData;  // vnode status can be checked here
  return walCommit(pVnode->pWal, index);
}

static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) {
  SVnode *pVnode = logStore->pData;  // vnode status can be checked here
  return walPrune(pVnode->pWal, index);
}

static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) {
  SVnode *pVnode = logStore->pData;  // vnode status can be checked here
  return walRollback(pVnode->pWal, index);
}

static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
  SVnode *pVnode = stateMng->pData;
  return vnodeSaveState(pVnode->vgId, pState);
}

static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
  SVnode *pVnode = stateMng->pData;
  return vnodeSaveState(pVnode->vgId, pState);
}

static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
  return 0;
}

static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; }

static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) {
  return 0;
}

static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) {
  return 0;
}

static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; }

static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {}

static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {}

220 221
static int32_t vnodeOpenVnode(int32_t vgId) {
  int32_t code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222

223
  SVnode *pVnode = calloc(sizeof(SVnode), 1);
224
  if (pVnode == NULL) {
S
TD-2324  
Shengliang Guan 已提交
225
    vError("vgId:%d, failed to open vnode since no enough memory", vgId);
226 227 228
    return TAOS_SYSTEM_ERROR(errno);
  }

229
  pVnode->vgId = vgId;
230 231 232 233
  pVnode->accessState = TAOS_VN_STATUS_INIT;
  pVnode->status = TSDB_VN_ALL_ACCCESS;
  pVnode->refCount = 1;
  pVnode->role = TAOS_SYNC_ROLE_CANDIDATE;
S
TD-2289  
Shengliang Guan 已提交
234
  pthread_mutex_init(&pVnode->statusMutex, NULL);
S
Shengliang Guan 已提交
235

S
Shengliang Guan 已提交
236 237 238
  vDebug("vgId:%d, vnode is opened", pVnode->vgId);
  taosHashPut(tsVnode.hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnode *));

239
  code = vnodeReadCfg(vgId, &pVnode->cfg);
240
  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
241
    vError("vgId:%d, failed to read config file, set cfgVersion to 0", pVnode->vgId);
242 243
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
S
Shengliang Guan 已提交
244
    return 0;
245
  }
246

S
Shengliang Guan 已提交
247
  code = vnodeSaveState(vgId, &pVnode->term);
248
  if (code != TSDB_CODE_SUCCESS) {
249 250 251 252
    vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 1;
    vnodeCleanupVnode(pVnode);
    return code;
253 254
  }

255 256 257 258 259
  pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode);
  pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode);
  pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode);
  if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) {
    vnodeCleanupVnode(pVnode);
260 261
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262

263 264 265 266 267
  char path[PATH_MAX + 20];
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  pVnode->pWal = walOpen(path, &pVnode->cfg.wal);
  if (pVnode->pWal == NULL) {
    vnodeCleanupVnode(pVnode);
J
jtao1735 已提交
268 269 270
    return terrno;
  }

S
Shengliang Guan 已提交
271 272 273
  // create sync node
  SSyncInfo syncInfo = {0};
  syncInfo.vgId = vgId;
S
Shengliang Guan 已提交
274
  syncInfo.snapshotIndex = 0;  // todo, from tsdb
S
Shengliang Guan 已提交
275 276
  memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
  syncInfo.fsm.pData = pVnode;
S
Shengliang Guan 已提交
277 278 279 280 281 282 283
  syncInfo.fsm.applyLog = vnodeApplyLog;
  syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged;
  syncInfo.fsm.getSnapshot = vnodeGetSnapshot;
  syncInfo.fsm.applySnapshot = vnodeApplySnapshot;
  syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone;
  syncInfo.fsm.onRollback = vnodeOnRollback;
  syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged;
S
Shengliang Guan 已提交
284
  syncInfo.logStore.pData = pVnode;
S
Shengliang Guan 已提交
285 286 287 288
  syncInfo.logStore.logWrite = vnodeLogWrite;
  syncInfo.logStore.logCommit = vnodeLogCommit;
  syncInfo.logStore.logPrune = vnodeLogPrune;
  syncInfo.logStore.logRollback = vnodeLogRollback;
S
Shengliang Guan 已提交
289
  syncInfo.stateManager.pData = pVnode;
S
Shengliang Guan 已提交
290 291
  syncInfo.stateManager.saveServerState = vnodeSaveServerState;
  syncInfo.stateManager.readServerState = vnodeReadServerState;
S
Shengliang Guan 已提交
292 293 294 295 296 297

  pVnode->pSync = syncStart(&syncInfo);
  if (pVnode->pSync == NULL) {
    vnodeCleanupVnode(pVnode);
    return terrno;
  }
298

299 300 301
  vnodeSetReadyStatus(pVnode);
  return TSDB_CODE_SUCCESS;
}
302

303 304 305
int32_t vnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
  int32_t code = 0;
  char    path[PATH_MAX + 20] = {0};
306

307 308 309 310 311
  snprintf(path, sizeof(path), "%s/vnode%d", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
312 313
  }

314 315 316 317 318
  snprintf(path, sizeof(path), "%s/vnode%d/cfg", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
S
TD-1696  
Shengliang Guan 已提交
319
  }
J
jtao1735 已提交
320

321 322 323 324
  snprintf(path, sizeof(path), "%s/vnode%d/wal", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
S
TD-2087  
Shengliang Guan 已提交
325 326 327
    return code;
  }

328 329 330 331 332
  snprintf(path, sizeof(path), "%s/vnode%d/tq", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
333 334
  }

335 336 337 338 339
  snprintf(path, sizeof(path), "%s/vnode%d/tsdb", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
340
  }
341

342 343 344 345 346 347
  snprintf(path, sizeof(path), "%s/vnode%d/meta", tsVnodeDir, vgId);
  if (taosMkDir(path) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    vError("vgId:%d, failed to create since %s", vgId, tstrerror(code));
    return code;
  }
348

349 350 351 352 353
  code = vnodeWriteCfg(vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to save vnode cfg since %s", vgId, tstrerror(code));
    return code;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
354

355
  return vnodeOpenVnode(vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
356 357
}

358
int32_t vnodeAlterVnode(SVnode * pVnode, SVnodeCfg *pCfg) {
S
TD-2289  
Shengliang Guan 已提交
359 360
  int32_t code = 0;
  int32_t vgId = pVnode->vgId;
361

362 363 364 365
  bool walChanged = (memcmp(&pCfg->wal, &pVnode->cfg.wal, sizeof(SWalCfg)) != 0);
  bool tsdbChanged = (memcmp(&pCfg->tsdb, &pVnode->cfg.tsdb, sizeof(STsdbCfg)) != 0);
  bool metaChanged = (memcmp(&pCfg->meta, &pVnode->cfg.meta, sizeof(SMetaCfg)) != 0);
  bool syncChanged = (memcmp(&pCfg->sync, &pVnode->cfg.sync, sizeof(SSyncCluster)) != 0);
366

367 368 369 370
  if (!walChanged && !tsdbChanged && !metaChanged && !syncChanged) {
    vDebug("vgId:%d, nothing changed", vgId);
    vnodeRelease(pVnode);
    return code;
S
TD-1894  
Shengliang Guan 已提交
371 372
  }

373 374 375 376 377
  code = vnodeWriteCfg(pVnode->vgId, pCfg);
  if (code != 0) {
    vError("vgId:%d, failed to write alter msg to file since %s", vgId, tstrerror(code));
    vnodeRelease(pVnode);
    return code;
S
TD-1746  
Shengliang Guan 已提交
378
  }
379

380
  pVnode->cfg = *pCfg;
B
Bomin Zhang 已提交
381

382 383
  if (walChanged) {
    code = walAlter(pVnode->pWal, &pVnode->cfg.wal);
S
TD-2065  
Shengliang Guan 已提交
384
    if (code != 0) {
385 386 387
      vDebug("vgId:%d, failed to alter wal since %s", vgId, tstrerror(code));
      vnodeRelease(pVnode);
      return code;
S
TD-2065  
Shengliang Guan 已提交
388
    }
S
TD-2393  
Shengliang Guan 已提交
389 390
  }

391 392
  if (tsdbChanged) {
    // todo
S
TD-1746  
Shengliang Guan 已提交
393 394
  }

395 396
  if (metaChanged) {
    // todo
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397 398
  }

399
  if (syncChanged) {
S
Shengliang Guan 已提交
400
    syncReconfig(pVnode->pSync, &pVnode->cfg.sync);
401 402
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
403
  vnodeRelease(pVnode);
404
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405
}
H
hzcheng 已提交
406

407 408 409 410 411
int32_t vnodeDropVnode(SVnode *pVnode) {
  if (pVnode->cfg.dropped) {
    vInfo("vgId:%d, already set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeRelease(pVnode);
    return TSDB_CODE_SUCCESS;
412 413
  }

414 415 416 417 418 419 420 421
  pVnode->cfg.dropped = 1;
  int32_t code = vnodeWriteCfg(pVnode->vgId, &pVnode->cfg);
  if (code == 0) {
    vInfo("vgId:%d, set drop flag, ref:%d", pVnode->vgId, pVnode->refCount);
    vnodeCleanupVnode(pVnode);
  } else {
    vError("vgId:%d, failed to set drop flag since %s", pVnode->vgId, tstrerror(code));
    pVnode->cfg.dropped = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
422
  }
423

424 425 426
  vnodeRelease(pVnode);
  return code;
}
427

428 429 430
int32_t vnodeSyncVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
}
431

432 433
int32_t vnodeCompactVnode(SVnode *pVnode) {
  return TSDB_CODE_SUCCESS;
434
}
435

436
static void *vnodeOpenVnodeFunc(void *param) {
437 438 439 440 441 442 443 444 445 446
  SOpenVnodeThread *pThread = param;

  vDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("vnodeOpenVnode");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    int32_t vgId = pThread->vnodeList[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId,
447
             tsVnode.openVnodes, tsVnode.totalVnodes);
448 449
    // (*vnodeInst()->fp.ReportStartup)("open-vnodes", stepDesc);

450
    if (vnodeOpenVnode(vgId) < 0) {
451 452 453 454 455 456 457
      vError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      vDebug("vgId:%d, is opened by thread:%d", vgId, pThread->threadIndex);
      pThread->opened++;
    }

458
    atomic_add_fetch_32(&tsVnode.openVnodes, 1);
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
  }

  vDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t vnodeGetVnodeListFromDisk(int32_t vnodeList[], int32_t *numOfVnodes) {
#if 0
  DIR *dir = opendir(tsVnodeDir);
  if (dir == NULL) return TSDB_CODE_DND_NO_WRITE_ACCESS;

  *numOfVnodes = 0;
  struct dirent *de = NULL;
  while ((de = readdir(dir)) != NULL) {
    if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
    if (de->d_type & DT_DIR) {
      if (strncmp("vnode", de->d_name, 5) != 0) continue;
      int32_t vnode = atoi(de->d_name + 5);
      if (vnode == 0) continue;

      (*numOfVnodes)++;

      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
        closedir(dir);
        return TSDB_CODE_DND_TOO_MANY_VNODES;
      } else {
        vnodeList[*numOfVnodes - 1] = vnode;
      }
    }
  }
  closedir(dir);
#endif
  return TSDB_CODE_SUCCESS;
}

static int32_t vnodeOpenVnodes() {
  int32_t vnodeList[TSDB_MAX_VNODES] = {0};
  int32_t numOfVnodes = 0;
  int32_t status = vnodeGetVnodeListFromDisk(vnodeList, &numOfVnodes);

  if (status != TSDB_CODE_SUCCESS) {
    vInfo("failed to get vnode list from disk since code:%d", status);
    return status;
  }

506
  tsVnode.totalVnodes = numOfVnodes;
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528

  int32_t threadNum = tsNumOfCores;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t           t = v % threadNum;
    SOpenVnodeThread *pThread = &threads[t];
    pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
  }

  vInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

529 530
    pThread->threadId = taosCreateThread(vnodeOpenVnodeFunc, pThread);
    if (pThread->threadId == NULL) {
531 532 533 534 535 536 537 538
      vError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
  }

  int32_t openVnodes = 0;
  int32_t failedVnodes = 0;
  for (int32_t t = 0; t < threadNum; ++t) {
    SOpenVnodeThread *pThread = &threads[t];
539 540 541
    taosDestoryThread(pThread->threadId);
    pThread->threadId = NULL;

542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    openVnodes += pThread->opened;
    failedVnodes += pThread->failed;
    free(pThread->vnodeList);
  }

  free(threads);
  vInfo("there are total vnodes:%d, opened:%d", numOfVnodes, openVnodes);

  if (failedVnodes != 0) {
    vError("there are total vnodes:%d, failed:%d", numOfVnodes, failedVnodes);
    return -1;
  }

  return TSDB_CODE_SUCCESS;
}

558 559
static int32_t vnodeGetVnodeList(SVnode *vnodeList[], int32_t *numOfVnodes) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
560 561 562 563 564 565 566 567
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      (*numOfVnodes)++;
      if (*numOfVnodes >= TSDB_MAX_VNODES) {
        vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
        continue;
      } else {
568
        vnodeList[*numOfVnodes - 1] = (*pVnode);
569 570 571
      }
    }

572
    pIter = taosHashIterate(tsVnode.hash, pIter);
573 574 575 576 577 578
  }

  return TSDB_CODE_SUCCESS;
}

static void vnodeCleanupVnodes() {
579
  SVnode* vnodeList[TSDB_MAX_VNODES] = {0};
580 581 582 583 584 585 586 587 588 589
  int32_t numOfVnodes = 0;

  int32_t code = vnodeGetVnodeList(vnodeList, &numOfVnodes);

  if (code != TSDB_CODE_SUCCESS) {
    vInfo("failed to get dnode list since code %d", code);
    return;
  }

  for (int32_t i = 0; i < numOfVnodes; ++i) {
590
    vnodeCleanupVnode(vnodeList[i]);
591 592 593 594 595
  }

  vInfo("total vnodes:%d are all closed", numOfVnodes);
}

596 597 598 599 600 601 602 603 604 605
static void vnodeIncRef(void *ptNode) {
  assert(ptNode != NULL);

  SVnode **ppVnode = (SVnode **)ptNode;
  assert(ppVnode);
  assert(*ppVnode);

  SVnode *pVnode = *ppVnode;
  atomic_add_fetch_32(&pVnode->refCount, 1);
  vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
606 607
}

608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
SVnode *vnodeAcquireInAllState(int32_t vgId) {
  SVnode *pVnode = NULL;

  // taosHashGetClone(tsVnode.hash, &vgId, sizeof(int32_t), vnodeIncRef, (void*)&pVnode);
  if (pVnode == NULL) {
    vDebug("vgId:%d, can't accquire since not exist", vgId);
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
    return NULL;
  }

  return pVnode;
}

SVnode *vnodeAcquire(int32_t vgId) {
  SVnode *pVnode = vnodeAcquireInAllState(vgId);
  if (pVnode == NULL) return NULL;

  if (vnodeInStatus(pVnode, TAOS_VN_STATUS_READY)) {
    return pVnode;
627
  } else {
628 629 630 631
    vDebug("vgId:%d, can't accquire since not in ready status", vgId);
    vnodeRelease(pVnode);
    terrno = TSDB_CODE_VND_INVALID_TSDB_STATE;
    return NULL;
632 633 634
  }
}

635 636
void vnodeRelease(SVnode *pVnode) {
  if (pVnode == NULL) return;
637

638 639 640 641 642 643 644 645 646 647 648
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  int32_t vgId = pVnode->vgId;

  vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
  assert(refCount >= 0);

  if (refCount <= 0) {
    vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
    vnodeDestroyVnode(pVnode);
    int32_t count = taosHashGetSize(tsVnode.hash);
    vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
649
  }
650 651 652 653 654
}

int32_t vnodeInitMain() {
  tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (tsVnode.hash == NULL) {
655 656 657 658 659 660 661 662 663 664
    vError("failed to init vnode mgmt");
    return -1;
  }

  vInfo("vnode main is initialized");
  return vnodeOpenVnodes();
}

void vnodeCleanupMain() {
  vnodeCleanupVnodes();
665 666
  taosHashCleanup(tsVnode.hash);
  tsVnode.hash = NULL;
667 668 669 670 671 672 673 674 675
}

static void vnodeBuildVloadMsg(SVnode *pVnode, SStatusMsg *pStatus) {
  int64_t totalStorage = 0;
  int64_t compStorage = 0;
  int64_t pointsWritten = 0;

  if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;

676 677 678
  // if (pVnode->tsdb) {
  //   tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
  // }
679 680 681 682 683 684 685 686 687 688

  SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
  pLoad->vgId = htonl(pVnode->vgId);
  pLoad->totalStorage = htobe64(totalStorage);
  pLoad->compStorage = htobe64(compStorage);
  pLoad->pointsWritten = htobe64(pointsWritten);
  pLoad->status = pVnode->status;
  pLoad->role = pVnode->role;
}

689 690
void vnodeGetStatus(SStatusMsg *pStatus) {
  void *pIter = taosHashIterate(tsVnode.hash, NULL);
691 692 693 694 695
  while (pIter) {
    SVnode **pVnode = pIter;
    if (*pVnode) {
      vnodeBuildVloadMsg(*pVnode, pStatus);
    }
696
    pIter = taosHashIterate(tsVnode.hash, pIter);
697 698 699
  }
}

700
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
701 702
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    pAccess[i].vgId = htonl(pAccess[i].vgId);
703
    SVnode *pVnode = vnodeAcquire(pAccess[i].vgId);
704 705 706 707 708 709 710 711 712
    if (pVnode != NULL) {
      pVnode->accessState = pAccess[i].accessState;
      if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
        vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
      }
      vnodeRelease(pVnode);
    }
  }
}