vmInt.c 18.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
26
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
27
  } else {
S
Shengliang Guan 已提交
28
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
29
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
30
  }
31
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
32 33 34 35

  return pVnode;
}

S
Shengliang 已提交
36
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
37 38
  if (pVnode == NULL) return;

39
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
40
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
41
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
42
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
43 44
}

S
Shengliang 已提交
45
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
46
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
47 48 49 50 51 52
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
53
  pVnode->vgVersion = pCfg->vgVersion;
54
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
55
  pVnode->dropped = 0;
56
  pVnode->path = taosStrdup(pCfg->path);
S
Shengliang Guan 已提交
57
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
58

59
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
60
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
61
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
62 63 64
    return -1;
  }

S
shm  
Shengliang Guan 已提交
65
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
66
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
67 68
    taosMemoryFree(pVnode->path);
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
69 70 71
    return -1;
  }

72
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
73
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
74
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
75 76 77 78

  return code;
}

S
Shengliang Guan 已提交
79
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
S
Shengliang Guan 已提交
80
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
81

82
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
83
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
84
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
85
  vmReleaseVnode(pMgmt, pVnode);
86

S
Shengliang Guan 已提交
87 88 89
  dInfo("vgId:%d, pre close", pVnode->vgId);
  vnodePreClose(pVnode->pImpl);

90
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
91
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
92

93 94 95
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
        pVnode->pWriteW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pWriteW);
S
Shengliang Guan 已提交
96

97 98 99
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
        pVnode->pSyncW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncW);
S
Shengliang Guan 已提交
100

101 102 103
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
        pVnode->pSyncRdW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
S
Shengliang Guan 已提交
104

105 106 107
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
        pVnode->pApplyW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pApplyW);
S
Shengliang Guan 已提交
108

109
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
S
shm  
Shengliang Guan 已提交
110
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
S
Shengliang Guan 已提交
111

112
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
S
Shengliang Guan 已提交
113
        pVnode->pFetchQ->threadId);
S
shm  
Shengliang Guan 已提交
114
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
115

116
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
S
Shengliang Guan 已提交
117
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
118

119
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
120

121 122
  dInfo("vgId:%d, post close", pVnode->vgId);
  vnodePostClose(pVnode->pImpl);
123

S
shm  
Shengliang Guan 已提交
124
  vmFreeQueue(pMgmt, pVnode);
S
Shengliang Guan 已提交
125 126

  if (commitAndRemoveWal) {
127
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
S
Shengliang Guan 已提交
128
    vnodeSyncCommit(pVnode->pImpl);
129
    vnodeBegin(pVnode->pImpl);
130
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
S
Shengliang Guan 已提交
131 132
  }

S
shm  
Shengliang Guan 已提交
133 134
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
135
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
136

S
Shengliang Guan 已提交
137 138 139 140 141 142 143 144
  if (commitAndRemoveWal) {
    char path[TSDB_FILENAME_LEN] = {0};
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
    tfsRmdir(pMgmt->pTfs, path);
    tfsMkdir(pMgmt->pTfs, path);
  }

S
shm  
Shengliang Guan 已提交
145
  if (pVnode->dropped) {
146
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
147 148
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
149 150
  }

wafwerar's avatar
wafwerar 已提交
151 152
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
153 154
}

S
Shengliang Guan 已提交
155
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
156
  SVnodeThread *pThread = param;
S
Shengliang 已提交
157
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
158
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
159

160
  dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
S
shm  
Shengliang Guan 已提交
161 162 163 164 165 166 167 168
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
169 170
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
171
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
172
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
173 174 175 176
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
177
      vmOpenVnode(pMgmt, pCfg, pImpl);
178
      dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
shm  
Shengliang Guan 已提交
179
      pThread->opened++;
S
Shengliang Guan 已提交
180
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
181 182 183
    }
  }

184 185
  dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
S
shm  
Shengliang Guan 已提交
186 187 188
  return NULL;
}

S
Shengliang 已提交
189
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
190
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
191 192
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
193
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
194 195 196 197 198
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
199
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
200 201 202 203 204 205
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
206
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
207
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
208 209
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
210
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
211 212 213
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
214
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
215 216 217 218 219 220 221 222
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
223
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
224 225 226 227 228

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
229 230 231
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
232
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
233 234 235
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
236
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
237 238 239 240 241
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
242
      taosThreadJoin(pThread->thread, NULL);
243
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
244
    }
wafwerar's avatar
wafwerar 已提交
245
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
246
  }
wafwerar's avatar
wafwerar 已提交
247 248
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
249 250 251 252 253

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
S
Shengliang Guan 已提交
254
    dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
S
shm  
Shengliang Guan 已提交
255 256 257 258
    return 0;
  }
}

259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
static void *vmCloseVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("close-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-close", stepDesc);

S
Shengliang Guan 已提交
274
    vmCloseVnode(pMgmt, pVnode, false);
275 276 277 278 279 280
  }

  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
  return NULL;
}

S
Shengliang 已提交
281
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
282
  dInfo("start to close all vnodes");
283 284
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  dInfo("vnodes mgmt worker is stopped");
S
shm  
Shengliang Guan 已提交
285 286

  int32_t     numOfVnodes = 0;
287
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
  }

  pMgmt->state.openVnodes = 0;
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
    taosMemoryFree(pThread->ppVnodes);
S
shm  
Shengliang Guan 已提交
332
  }
333
  taosMemoryFree(threads);
S
shm  
Shengliang Guan 已提交
334

335 336
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
337 338 339 340 341 342 343 344 345 346
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
347
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
348 349 350
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
351
  tfsClose(pMgmt->pTfs);
352
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
353
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
354
}
S
shm  
Shengliang Guan 已提交
355

S
Shengliang Guan 已提交
356
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
357 358
  int32_t     numOfVnodes = 0;
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
Shengliang Guan 已提交
359

360
  if (ppVnodes != NULL) {
S
Shengliang Guan 已提交
361 362 363 364 365
    for (int32_t i = 0; i < numOfVnodes; ++i) {
      SVnodeObj *pVnode = ppVnodes[i];
      vnodeSyncCheckTimeout(pVnode->pImpl);
      vmReleaseVnode(pMgmt, pVnode);
    }
366
    taosMemoryFree(ppVnodes);
S
Shengliang Guan 已提交
367 368
  }
}
S
Shengliang Guan 已提交
369 370 371 372 373 374 375 376 377 378 379 380 381

static void *vmThreadFp(void *param) {
  SVnodeMgmt *pMgmt = param;
  int64_t     lastTime = 0;
  setThreadName("vnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMgmt->stop) break;
    if (lastTime % 10 != 0) continue;

    int64_t sec = lastTime / 10;
S
Shengliang Guan 已提交
382
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
S
Shengliang Guan 已提交
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
      vmCheckSyncTimeout(pMgmt);
    }
  }

  return NULL;
}

static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
    dError("failed to create vnode timer thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
  pMgmt->stop = true;
  if (taosCheckPthreadValid(pMgmt->thread)) {
    taosThreadJoin(pMgmt->thread, NULL);
    taosThreadClear(&pMgmt->thread);
  }
}

S
Shengliang Guan 已提交
411
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
412 413 414
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
415 416
  if (pMgmt == NULL) goto _OVER;

417
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
418 419 420
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
421
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
422
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
423
  pMgmt->msgCb.mgmt = pMgmt;
424
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
425 426

  SDiskCfg dCfg = {0};
427
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
428 429
  dCfg.level = 0;
  dCfg.primary = 1;
430 431
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
432 433 434 435 436 437 438 439 440 441
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
442
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
443

S
shm  
Shengliang Guan 已提交
444 445
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
446
    goto _OVER;
S
shm  
Shengliang Guan 已提交
447
  }
S
Shengliang 已提交
448
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
449

M
Minghao Li 已提交
450 451
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
452
    goto _OVER;
M
Minghao Li 已提交
453
  }
S
Shengliang 已提交
454
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
455

H
Hongze Cheng 已提交
456
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
457
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
458 459
    goto _OVER;
  }
S
Shengliang 已提交
460
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
461 462

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
463 464
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
465
  }
S
Shengliang 已提交
466
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
467 468

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
469
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
470
    goto _OVER;
S
shm  
Shengliang Guan 已提交
471
  }
S
Shengliang 已提交
472
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
473

S
slzhou 已提交
474
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
475
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
476
    goto _OVER;
S
slzhou 已提交
477 478
  }

S
shm  
Shengliang Guan 已提交
479 480
  code = 0;

S
shm  
Shengliang Guan 已提交
481 482
_OVER:
  if (code == 0) {
S
Shengliang 已提交
483
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
484 485
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
486
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
487
  }
S
shm  
Shengliang Guan 已提交
488

S
Shengliang 已提交
489
  return code;
S
shm  
Shengliang Guan 已提交
490 491
}

S
Shengliang 已提交
492
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
493
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
494
  return 0;
S
shm  
Shengliang Guan 已提交
495
}
S
shm  
Shengliang Guan 已提交
496

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
517
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
518 519 520 521 522 523 524 525 526 527 528
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
529
  int32_t     numOfVnodes = 0;
530
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
531

532 533 534 535 536 537 538 539 540
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
541 542
  }

543 544 545
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
S
Shengliang Guan 已提交
546
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
S
Shengliang Guan 已提交
547 548
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
574
    taosMemoryFree(pThread->ppVnodes);
575 576 577
  }
  taosMemoryFree(threads);

578
  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
Shengliang Guan 已提交
579 580
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
581 582
  }

583 584
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
585 586
  }

S
Shengliang Guan 已提交
587
  return vmInitTimer(pMgmt);
588 589
}

S
Shengliang Guan 已提交
590
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
591

S
Shengliang 已提交
592 593 594 595
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
596
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
597 598 599
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
600

S
Shengliang 已提交
601 602
  return mgmtFunc;
}