vmInt.c 13.3 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
26
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
27
  } else {
S
Shengliang Guan 已提交
28
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
29
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
30
  }
31
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
32 33 34 35

  return pVnode;
}

S
Shengliang 已提交
36
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
37 38
  if (pVnode == NULL) return;

39
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
40
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
41
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
42
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
43 44
}

S
Shengliang 已提交
45
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
46
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
47 48 49 50 51 52
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
53
  pVnode->vgVersion = pCfg->vgVersion;
54
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
55 56
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
57
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
58

59
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
60 61 62 63
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
64
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
65 66 67 68
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

69
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
70
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
71
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
72 73 74 75

  return code;
}

S
Shengliang 已提交
76
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
77
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
78

79 80
  vnodePreClose(pVnode->pImpl);

81
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
82
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
83
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
84
  vmReleaseVnode(pMgmt, pVnode);
85 86

  dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
87
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
88 89
  dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);

S
shm  
Shengliang Guan 已提交
90 91 92 93 94
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
95
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
96
  dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
97

S
shm  
Shengliang Guan 已提交
98
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
99 100 101 102 103
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
104
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
105 106
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
107 108
  }

wafwerar's avatar
wafwerar 已提交
109 110
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
111 112
}

S
Shengliang Guan 已提交
113
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
114
  SVnodeThread *pThread = param;
S
Shengliang 已提交
115
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
116
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
117 118 119 120 121 122 123 124 125 126

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
127 128
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
129
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
130
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
131 132 133 134
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
135
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
136 137
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
138
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
139 140 141
    }
  }

S
Shengliang Guan 已提交
142
  dDebug("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
S
shm  
Shengliang Guan 已提交
143 144 145 146
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
147
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
148
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
149 150
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
151
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
152 153 154 155 156
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
157
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
158 159 160 161 162 163
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
164
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
165
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
166 167
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
168
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
169 170 171
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
172
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
173 174 175 176 177 178 179 180
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
181
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
182 183 184 185 186

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
187 188 189
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
190
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
191 192 193
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
194
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
195 196 197 198 199
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
200
      taosThreadJoin(pThread->thread, NULL);
201
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
202
    }
wafwerar's avatar
wafwerar 已提交
203
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
204
  }
wafwerar's avatar
wafwerar 已提交
205 206
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
207 208 209 210 211

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
S
Shengliang Guan 已提交
212
    dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
S
shm  
Shengliang Guan 已提交
213 214 215 216
    return 0;
  }
}

S
Shengliang 已提交
217
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
218 219 220
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
221
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
222 223

  for (int32_t i = 0; i < numOfVnodes; ++i) {
224
    vmCloseVnode(pMgmt, ppVnodes[i]);
S
shm  
Shengliang Guan 已提交
225 226
  }

227 228
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
229 230 231 232 233 234 235 236 237 238
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
239
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
240 241 242
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
243
  tfsClose(pMgmt->pTfs);
244
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
245
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
246
}
S
shm  
Shengliang Guan 已提交
247

S
Shengliang Guan 已提交
248
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
249 250 251
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
252 253
  if (pMgmt == NULL) goto _OVER;

254
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
255 256 257
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
258
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
259
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
260
  pMgmt->msgCb.mgmt = pMgmt;
261
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
262 263

  SDiskCfg dCfg = {0};
264
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
265 266
  dCfg.level = 0;
  dCfg.primary = 1;
267 268
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
269 270 271 272 273 274 275 276 277 278
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
279
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
280

S
shm  
Shengliang Guan 已提交
281 282
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
283
    goto _OVER;
S
shm  
Shengliang Guan 已提交
284
  }
S
Shengliang 已提交
285
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
286

M
Minghao Li 已提交
287 288
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
289
    goto _OVER;
M
Minghao Li 已提交
290
  }
S
Shengliang 已提交
291
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
292

H
Hongze Cheng 已提交
293
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
294
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
295 296
    goto _OVER;
  }
S
Shengliang 已提交
297
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
298 299

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
300 301
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
302
  }
S
Shengliang 已提交
303
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
304 305

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
306
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
307
    goto _OVER;
S
shm  
Shengliang Guan 已提交
308
  }
S
Shengliang 已提交
309
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
310

S
slzhou 已提交
311
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
312
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
313
    goto _OVER;
S
slzhou 已提交
314 315
  }

S
shm  
Shengliang Guan 已提交
316 317
  code = 0;

S
shm  
Shengliang Guan 已提交
318 319
_OVER:
  if (code == 0) {
S
Shengliang 已提交
320
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
321 322
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
323
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
324
  }
S
shm  
Shengliang Guan 已提交
325

S
Shengliang 已提交
326
  return code;
S
shm  
Shengliang Guan 已提交
327 328
}

S
Shengliang 已提交
329
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
330
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
331
  return 0;
S
shm  
Shengliang Guan 已提交
332
}
S
shm  
Shengliang Guan 已提交
333

334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
      dDebug("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
366
  int32_t     numOfVnodes = 0;
367
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
368

369 370 371 372 373 374 375 376 377
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
378 379
  }

380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
409
    taosMemoryFree(pThread->ppVnodes);
410 411 412
  }
  taosMemoryFree(threads);

413
  for (int32_t i = 0; i < numOfVnodes; ++i) {
414
    SVnodeObj *pVnode = ppVnodes[i];
415 416 417
    vmReleaseVnode(pMgmt, pVnode);
  }

418 419
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
420 421
  }

422 423 424
  return 0;
}

S
Shengliang 已提交
425
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
426
  // process inside the vnode
427 428
}

S
Shengliang 已提交
429 430 431 432
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
433
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
434 435 436
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
437

S
Shengliang 已提交
438 439
  return mgmtFunc;
}