vmInt.c 18.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
26
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
27
  } else {
S
Shengliang Guan 已提交
28
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
29
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
30
  }
31
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
32 33 34 35

  return pVnode;
}

S
Shengliang 已提交
36
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
37 38
  if (pVnode == NULL) return;

39
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
40
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
41
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
42
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
43 44
}

S
Shengliang 已提交
45
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
46
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
47 48 49 50 51 52
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
53
  pVnode->vgVersion = pCfg->vgVersion;
54
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
55 56
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
57
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
58

59
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
60
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
61
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
62 63 64
    return -1;
  }

S
shm  
Shengliang Guan 已提交
65
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
66
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
67 68
    taosMemoryFree(pVnode->path);
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
69 70 71
    return -1;
  }

72
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
73
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
74
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
75 76 77 78

  return code;
}

S
Shengliang 已提交
79
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
80
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
81

B
Benguang Zhao 已提交
82 83
  vnodeProposeCommitOnNeed(pVnode->pImpl);

84
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
85
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
86
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
87
  vmReleaseVnode(pMgmt, pVnode);
88

S
Shengliang Guan 已提交
89 90 91
  dInfo("vgId:%d, pre close", pVnode->vgId);
  vnodePreClose(pVnode->pImpl);

92
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
93
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
94

95 96 97
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
        pVnode->pWriteW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pWriteW);
S
Shengliang Guan 已提交
98

99 100 101
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
        pVnode->pSyncW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncW);
S
Shengliang Guan 已提交
102

103 104 105
  dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
        pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
S
Shengliang Guan 已提交
106

107 108 109
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
        pVnode->pApplyW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pApplyW);
S
Shengliang Guan 已提交
110

111
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
S
shm  
Shengliang Guan 已提交
112
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
S
Shengliang Guan 已提交
113

114
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
S
Shengliang Guan 已提交
115
        pVnode->pFetchQ->threadId);
S
shm  
Shengliang Guan 已提交
116
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
117

118
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
S
Shengliang Guan 已提交
119
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
120

121
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
122

123 124
  dInfo("vgId:%d, post close", pVnode->vgId);
  vnodePostClose(pVnode->pImpl);
125

S
shm  
Shengliang Guan 已提交
126
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
127 128
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
129
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
130 131

  if (pVnode->dropped) {
132
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
133 134
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
135 136
  }

wafwerar's avatar
wafwerar 已提交
137 138
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
139 140
}

S
Shengliang Guan 已提交
141
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
142
  SVnodeThread *pThread = param;
S
Shengliang 已提交
143
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
144
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
145

146
  dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
S
shm  
Shengliang Guan 已提交
147 148 149 150 151 152 153 154
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
155 156
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
157
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
158
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
159 160 161 162
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
163
      vmOpenVnode(pMgmt, pCfg, pImpl);
164
      dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
shm  
Shengliang Guan 已提交
165
      pThread->opened++;
S
Shengliang Guan 已提交
166
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
167 168 169
    }
  }

170 171
  dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
S
shm  
Shengliang Guan 已提交
172 173 174
  return NULL;
}

S
Shengliang 已提交
175
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
176
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
177 178
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
179
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
180 181 182 183 184
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
185
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
186 187 188 189 190 191
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
192
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
193
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
194 195
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
196
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
197 198 199
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
200
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
201 202 203 204 205 206 207 208
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
209
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
210 211 212 213 214

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
215 216 217
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
218
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
219 220 221
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
222
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
223 224 225 226 227
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
228
      taosThreadJoin(pThread->thread, NULL);
229
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
230
    }
wafwerar's avatar
wafwerar 已提交
231
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
232
  }
wafwerar's avatar
wafwerar 已提交
233 234
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
235 236 237 238 239

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
S
Shengliang Guan 已提交
240
    dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
S
shm  
Shengliang Guan 已提交
241 242 243 244
    return 0;
  }
}

245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
static void *vmCloseVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("close-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-close", stepDesc);

    vmCloseVnode(pMgmt, pVnode);
  }

  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
  return NULL;
}

S
Shengliang 已提交
267
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
268
  dInfo("start to close all vnodes");
269 270
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  dInfo("vnodes mgmt worker is stopped");
S
shm  
Shengliang Guan 已提交
271 272

  int32_t     numOfVnodes = 0;
273
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
274

275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
  }

  pMgmt->state.openVnodes = 0;
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
    taosMemoryFree(pThread->ppVnodes);
S
shm  
Shengliang Guan 已提交
318
  }
319
  taosMemoryFree(threads);
S
shm  
Shengliang Guan 已提交
320

321 322
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
323 324 325 326 327 328 329 330 331 332
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
333
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
334 335 336
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
337
  tfsClose(pMgmt->pTfs);
338
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
339
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
340
}
S
shm  
Shengliang Guan 已提交
341

S
Shengliang Guan 已提交
342
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
343 344
  int32_t     numOfVnodes = 0;
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
Shengliang Guan 已提交
345

346 347
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = ppVnodes[i];
S
Shengliang Guan 已提交
348
    vnodeSyncCheckTimeout(pVnode->pImpl);
349 350 351 352 353
    vmReleaseVnode(pMgmt, pVnode);
  }

  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
Shengliang Guan 已提交
354 355
  }
}
S
Shengliang Guan 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368

static void *vmThreadFp(void *param) {
  SVnodeMgmt *pMgmt = param;
  int64_t     lastTime = 0;
  setThreadName("vnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMgmt->stop) break;
    if (lastTime % 10 != 0) continue;

    int64_t sec = lastTime / 10;
S
Shengliang Guan 已提交
369
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
S
Shengliang Guan 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
      vmCheckSyncTimeout(pMgmt);
    }
  }

  return NULL;
}

static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
    dError("failed to create vnode timer thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
  pMgmt->stop = true;
  if (taosCheckPthreadValid(pMgmt->thread)) {
    taosThreadJoin(pMgmt->thread, NULL);
    taosThreadClear(&pMgmt->thread);
  }
}

S
Shengliang Guan 已提交
398
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
399 400 401
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
402 403
  if (pMgmt == NULL) goto _OVER;

404
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
405 406 407
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
408
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
409
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
410
  pMgmt->msgCb.mgmt = pMgmt;
411
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
412 413

  SDiskCfg dCfg = {0};
414
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
415 416
  dCfg.level = 0;
  dCfg.primary = 1;
417 418
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
419 420 421 422 423 424 425 426 427 428
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
429
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
430

S
shm  
Shengliang Guan 已提交
431 432
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
433
    goto _OVER;
S
shm  
Shengliang Guan 已提交
434
  }
S
Shengliang 已提交
435
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
436

M
Minghao Li 已提交
437 438
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
439
    goto _OVER;
M
Minghao Li 已提交
440
  }
S
Shengliang 已提交
441
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
442

H
Hongze Cheng 已提交
443
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
444
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
445 446
    goto _OVER;
  }
S
Shengliang 已提交
447
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
448 449

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
450 451
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
452
  }
S
Shengliang 已提交
453
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
454 455

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
456
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
457
    goto _OVER;
S
shm  
Shengliang Guan 已提交
458
  }
S
Shengliang 已提交
459
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
460

S
slzhou 已提交
461
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
462
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
463
    goto _OVER;
S
slzhou 已提交
464 465
  }

S
shm  
Shengliang Guan 已提交
466 467
  code = 0;

S
shm  
Shengliang Guan 已提交
468 469
_OVER:
  if (code == 0) {
S
Shengliang 已提交
470
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
471 472
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
473
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
474
  }
S
shm  
Shengliang Guan 已提交
475

S
Shengliang 已提交
476
  return code;
S
shm  
Shengliang Guan 已提交
477 478
}

S
Shengliang 已提交
479
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
480
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
481
  return 0;
S
shm  
Shengliang Guan 已提交
482
}
S
shm  
Shengliang Guan 已提交
483

484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
504
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
505 506 507 508 509 510 511 512 513 514 515
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
516
  int32_t     numOfVnodes = 0;
517
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
518

519 520 521 522 523 524 525 526 527
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
528 529
  }

530 531 532
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
S
Shengliang Guan 已提交
533
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
S
Shengliang Guan 已提交
534 535
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
561
    taosMemoryFree(pThread->ppVnodes);
562 563 564
  }
  taosMemoryFree(threads);

565
  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
Shengliang Guan 已提交
566 567
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
568 569
  }

570 571
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
572 573
  }

S
Shengliang Guan 已提交
574
  return vmInitTimer(pMgmt);
575 576
}

S
Shengliang Guan 已提交
577
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
578

S
Shengliang 已提交
579 580 581 582
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
583
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
584 585 586
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
587

S
Shengliang 已提交
588 589
  return mgmtFunc;
}