vmInt.c 18.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
26
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
27
  } else {
S
Shengliang Guan 已提交
28
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
29
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
30
  }
31
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
32 33 34 35

  return pVnode;
}

S
Shengliang 已提交
36
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
37 38
  if (pVnode == NULL) return;

39
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
40
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
41
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
42
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
43 44
}

S
Shengliang 已提交
45
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
46
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
47 48 49 50 51 52
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
53
  pVnode->vgVersion = pCfg->vgVersion;
54
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
55 56
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
57
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
58

59
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
60
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
61
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
62 63 64
    return -1;
  }

S
shm  
Shengliang Guan 已提交
65
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
66
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
67 68
    taosMemoryFree(pVnode->path);
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
69 70 71
    return -1;
  }

72
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
73
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
74
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
75 76 77 78

  return code;
}

S
Shengliang Guan 已提交
79
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
S
Shengliang Guan 已提交
80
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
81

B
Benguang Zhao 已提交
82 83
  vnodeProposeCommitOnNeed(pVnode->pImpl);

84
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
85
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
86
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
87
  vmReleaseVnode(pMgmt, pVnode);
88

S
Shengliang Guan 已提交
89 90 91
  dInfo("vgId:%d, pre close", pVnode->vgId);
  vnodePreClose(pVnode->pImpl);

92
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
93
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
94

95 96 97
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
        pVnode->pWriteW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pWriteW);
S
Shengliang Guan 已提交
98

99 100 101
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
        pVnode->pSyncW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncW);
S
Shengliang Guan 已提交
102

103 104 105
  dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
        pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
S
Shengliang Guan 已提交
106

107 108 109
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
        pVnode->pApplyW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pApplyW);
S
Shengliang Guan 已提交
110

111
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
S
shm  
Shengliang Guan 已提交
112
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
S
Shengliang Guan 已提交
113

114
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
S
Shengliang Guan 已提交
115
        pVnode->pFetchQ->threadId);
S
shm  
Shengliang Guan 已提交
116
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
117

118
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
S
Shengliang Guan 已提交
119
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
120

121
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
122

123 124
  dInfo("vgId:%d, post close", pVnode->vgId);
  vnodePostClose(pVnode->pImpl);
125

S
shm  
Shengliang Guan 已提交
126
  vmFreeQueue(pMgmt, pVnode);
S
Shengliang Guan 已提交
127 128 129 130

  if (commitAndRemoveWal) {
    dInfo("vgId:%d, commit data", pVnode->vgId);
    vnodeSyncCommit(pVnode->pImpl);
131
    vnodeBegin(pVnode->pImpl);
132
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
S
Shengliang Guan 已提交
133 134
  }

S
shm  
Shengliang Guan 已提交
135 136
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
137
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
138

S
Shengliang Guan 已提交
139 140 141 142 143 144 145 146
  if (commitAndRemoveWal) {
    char path[TSDB_FILENAME_LEN] = {0};
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
    tfsRmdir(pMgmt->pTfs, path);
    tfsMkdir(pMgmt->pTfs, path);
  }

S
shm  
Shengliang Guan 已提交
147
  if (pVnode->dropped) {
148
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
149 150
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
151 152
  }

wafwerar's avatar
wafwerar 已提交
153 154
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
155 156
}

S
Shengliang Guan 已提交
157
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
158
  SVnodeThread *pThread = param;
S
Shengliang 已提交
159
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
160
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
161

162
  dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
S
shm  
Shengliang Guan 已提交
163 164 165 166 167 168 169 170
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
171 172
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
173
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
174
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
175 176 177 178
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
179
      vmOpenVnode(pMgmt, pCfg, pImpl);
180
      dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
shm  
Shengliang Guan 已提交
181
      pThread->opened++;
S
Shengliang Guan 已提交
182
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
183 184 185
    }
  }

186 187
  dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
S
shm  
Shengliang Guan 已提交
188 189 190
  return NULL;
}

S
Shengliang 已提交
191
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
192
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
193 194
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
195
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
196 197 198 199 200
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
201
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
202 203 204 205 206 207
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
208
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
209
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
210 211
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
212
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
213 214 215
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
216
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
217 218 219 220 221 222 223 224
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
225
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
226 227 228 229 230

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
231 232 233
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
234
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
235 236 237
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
238
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
239 240 241 242 243
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
244
      taosThreadJoin(pThread->thread, NULL);
245
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
246
    }
wafwerar's avatar
wafwerar 已提交
247
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
248
  }
wafwerar's avatar
wafwerar 已提交
249 250
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
251 252 253 254 255

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
S
Shengliang Guan 已提交
256
    dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
S
shm  
Shengliang Guan 已提交
257 258 259 260
    return 0;
  }
}

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
static void *vmCloseVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("close-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-close", stepDesc);

S
Shengliang Guan 已提交
276
    vmCloseVnode(pMgmt, pVnode, false);
277 278 279 280 281 282
  }

  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
  return NULL;
}

S
Shengliang 已提交
283
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
284
  dInfo("start to close all vnodes");
285 286
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  dInfo("vnodes mgmt worker is stopped");
S
shm  
Shengliang Guan 已提交
287 288

  int32_t     numOfVnodes = 0;
289
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
290

291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
  }

  pMgmt->state.openVnodes = 0;
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
    taosMemoryFree(pThread->ppVnodes);
S
shm  
Shengliang Guan 已提交
334
  }
335
  taosMemoryFree(threads);
S
shm  
Shengliang Guan 已提交
336

337 338
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
339 340 341 342 343 344 345 346 347 348
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
349
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
350 351 352
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
353
  tfsClose(pMgmt->pTfs);
354
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
355
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
356
}
S
shm  
Shengliang Guan 已提交
357

S
Shengliang Guan 已提交
358
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
359 360
  int32_t     numOfVnodes = 0;
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
Shengliang Guan 已提交
361

362
  if (ppVnodes != NULL) {
S
Shengliang Guan 已提交
363 364 365 366 367
    for (int32_t i = 0; i < numOfVnodes; ++i) {
      SVnodeObj *pVnode = ppVnodes[i];
      vnodeSyncCheckTimeout(pVnode->pImpl);
      vmReleaseVnode(pMgmt, pVnode);
    }
368
    taosMemoryFree(ppVnodes);
S
Shengliang Guan 已提交
369 370
  }
}
S
Shengliang Guan 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383

static void *vmThreadFp(void *param) {
  SVnodeMgmt *pMgmt = param;
  int64_t     lastTime = 0;
  setThreadName("vnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMgmt->stop) break;
    if (lastTime % 10 != 0) continue;

    int64_t sec = lastTime / 10;
S
Shengliang Guan 已提交
384
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
S
Shengliang Guan 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
      vmCheckSyncTimeout(pMgmt);
    }
  }

  return NULL;
}

static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
    dError("failed to create vnode timer thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
  pMgmt->stop = true;
  if (taosCheckPthreadValid(pMgmt->thread)) {
    taosThreadJoin(pMgmt->thread, NULL);
    taosThreadClear(&pMgmt->thread);
  }
}

S
Shengliang Guan 已提交
413
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
414 415 416
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
417 418
  if (pMgmt == NULL) goto _OVER;

419
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
420 421 422
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
423
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
424
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
425
  pMgmt->msgCb.mgmt = pMgmt;
426
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
427 428

  SDiskCfg dCfg = {0};
429
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
430 431
  dCfg.level = 0;
  dCfg.primary = 1;
432 433
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
434 435 436 437 438 439 440 441 442 443
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
444
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
445

S
shm  
Shengliang Guan 已提交
446 447
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
448
    goto _OVER;
S
shm  
Shengliang Guan 已提交
449
  }
S
Shengliang 已提交
450
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
451

M
Minghao Li 已提交
452 453
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
454
    goto _OVER;
M
Minghao Li 已提交
455
  }
S
Shengliang 已提交
456
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
457

H
Hongze Cheng 已提交
458
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
459
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
460 461
    goto _OVER;
  }
S
Shengliang 已提交
462
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
463 464

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
465 466
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
467
  }
S
Shengliang 已提交
468
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
469 470

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
471
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
472
    goto _OVER;
S
shm  
Shengliang Guan 已提交
473
  }
S
Shengliang 已提交
474
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
475

S
slzhou 已提交
476
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
477
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
478
    goto _OVER;
S
slzhou 已提交
479 480
  }

S
shm  
Shengliang Guan 已提交
481 482
  code = 0;

S
shm  
Shengliang Guan 已提交
483 484
_OVER:
  if (code == 0) {
S
Shengliang 已提交
485
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
486 487
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
488
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
489
  }
S
shm  
Shengliang Guan 已提交
490

S
Shengliang 已提交
491
  return code;
S
shm  
Shengliang Guan 已提交
492 493
}

S
Shengliang 已提交
494
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
495
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
496
  return 0;
S
shm  
Shengliang Guan 已提交
497
}
S
shm  
Shengliang Guan 已提交
498

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
519
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
520 521 522 523 524 525 526 527 528 529 530
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
531
  int32_t     numOfVnodes = 0;
532
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
533

534 535 536 537 538 539 540 541 542
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
543 544
  }

545 546 547
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
S
Shengliang Guan 已提交
548
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
S
Shengliang Guan 已提交
549 550
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
576
    taosMemoryFree(pThread->ppVnodes);
577 578 579
  }
  taosMemoryFree(threads);

580
  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
Shengliang Guan 已提交
581 582
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
583 584
  }

585 586
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
587 588
  }

S
Shengliang Guan 已提交
589
  return vmInitTimer(pMgmt);
590 591
}

S
Shengliang Guan 已提交
592
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
593

S
Shengliang 已提交
594 595 596 597
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
598
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
599 600 601
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
602

S
Shengliang 已提交
603 604
  return mgmtFunc;
}