vmInt.c 15.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
26
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
27
  } else {
S
Shengliang Guan 已提交
28
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
29
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
30
  }
31
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
32 33 34 35

  return pVnode;
}

S
Shengliang 已提交
36
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
37 38
  if (pVnode == NULL) return;

39
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
40
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
41
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
42
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
43 44
}

S
Shengliang 已提交
45
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
46
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
47 48 49 50 51 52
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
53
  pVnode->vgVersion = pCfg->vgVersion;
54
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
55 56
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
57
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
58

59
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
60
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
61
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
62 63 64
    return -1;
  }

S
shm  
Shengliang Guan 已提交
65
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
66
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
67 68
    taosMemoryFree(pVnode->path);
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
69 70 71
    return -1;
  }

72
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
73
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
74
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
75 76 77 78

  return code;
}

S
Shengliang 已提交
79
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
80
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
81

82 83
  vnodePreClose(pVnode->pImpl);

84
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
85
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
86
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
87
  vmReleaseVnode(pMgmt, pVnode);
88 89

  dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
90
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
91 92
  dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);

S
shm  
Shengliang Guan 已提交
93 94 95 96 97
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
98
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
99
  dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
100

S
shm  
Shengliang Guan 已提交
101
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
102 103 104 105 106
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
107
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
108 109
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
110 111
  }

wafwerar's avatar
wafwerar 已提交
112 113
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
114 115
}

S
Shengliang Guan 已提交
116
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
117
  SVnodeThread *pThread = param;
S
Shengliang 已提交
118
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
119
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
120 121 122 123 124 125 126 127 128 129

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
130 131
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
132
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
133
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
134 135 136 137
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
138
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
139 140
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
141
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
142 143 144
    }
  }

S
Shengliang Guan 已提交
145
  dDebug("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
S
shm  
Shengliang Guan 已提交
146 147 148 149
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
150
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
151
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
152 153
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
154
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
155 156 157 158 159
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
160
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
161 162 163 164 165 166
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
167
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
168
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
169 170
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
171
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
172 173 174
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
175
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
176 177 178 179 180 181 182 183
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
184
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
185 186 187 188 189

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
190 191 192
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
193
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
194 195 196
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
197
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
198 199 200 201 202
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
203
      taosThreadJoin(pThread->thread, NULL);
204
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
205
    }
wafwerar's avatar
wafwerar 已提交
206
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
207
  }
wafwerar's avatar
wafwerar 已提交
208 209
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
210 211 212 213 214

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
S
Shengliang Guan 已提交
215
    dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
S
shm  
Shengliang Guan 已提交
216 217 218 219
    return 0;
  }
}

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
static void *vmCloseVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("close-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-close", stepDesc);

    vmCloseVnode(pMgmt, pVnode);
  }

  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
  return NULL;
}

S
Shengliang 已提交
242
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
243 244 245
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
246
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
247

248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
  }

  pMgmt->state.openVnodes = 0;
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
    taosMemoryFree(pThread->ppVnodes);
S
shm  
Shengliang Guan 已提交
291
  }
292
  taosMemoryFree(threads);
S
shm  
Shengliang Guan 已提交
293

294 295
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
296 297 298 299 300 301 302 303 304 305
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
306
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
307 308 309
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
310
  tfsClose(pMgmt->pTfs);
311
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
312
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
313
}
S
shm  
Shengliang Guan 已提交
314

S
Shengliang Guan 已提交
315
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
316 317 318
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
319 320
  if (pMgmt == NULL) goto _OVER;

321
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
322 323 324
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
325
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
326
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
327
  pMgmt->msgCb.mgmt = pMgmt;
328
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
329 330

  SDiskCfg dCfg = {0};
331
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
332 333
  dCfg.level = 0;
  dCfg.primary = 1;
334 335
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
336 337 338 339 340 341 342 343 344 345
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
346
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
347

S
shm  
Shengliang Guan 已提交
348 349
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
350
    goto _OVER;
S
shm  
Shengliang Guan 已提交
351
  }
S
Shengliang 已提交
352
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
353

M
Minghao Li 已提交
354 355
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
356
    goto _OVER;
M
Minghao Li 已提交
357
  }
S
Shengliang 已提交
358
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
359

H
Hongze Cheng 已提交
360
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
361
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
362 363
    goto _OVER;
  }
S
Shengliang 已提交
364
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
365 366

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
367 368
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
369
  }
S
Shengliang 已提交
370
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
371 372

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
373
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
374
    goto _OVER;
S
shm  
Shengliang Guan 已提交
375
  }
S
Shengliang 已提交
376
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
377

S
slzhou 已提交
378
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
379
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
380
    goto _OVER;
S
slzhou 已提交
381 382
  }

S
shm  
Shengliang Guan 已提交
383 384
  code = 0;

S
shm  
Shengliang Guan 已提交
385 386
_OVER:
  if (code == 0) {
S
Shengliang 已提交
387
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
388 389
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
390
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
391
  }
S
shm  
Shengliang Guan 已提交
392

S
Shengliang 已提交
393
  return code;
S
shm  
Shengliang Guan 已提交
394 395
}

S
Shengliang 已提交
396
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
397
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
398
  return 0;
S
shm  
Shengliang Guan 已提交
399
}
S
shm  
Shengliang Guan 已提交
400

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      dDebug("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
433
  int32_t     numOfVnodes = 0;
434
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
435

436 437 438 439 440 441 442 443 444
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
445 446
  }

447 448 449
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
S
Shengliang Guan 已提交
450
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
S
Shengliang Guan 已提交
451 452
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
478
    taosMemoryFree(pThread->ppVnodes);
479 480 481
  }
  taosMemoryFree(threads);

482
  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
Shengliang Guan 已提交
483 484
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
485 486
  }

487 488
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
489 490
  }

491 492 493
  return 0;
}

S
Shengliang 已提交
494
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
495
  // process inside the vnode
496 497
}

S
Shengliang 已提交
498 499 500 501
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
502
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
503 504 505
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
506

S
Shengliang 已提交
507 508
  return mgmtFunc;
}