vmInt.c 21.5 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
18
#include "tfs.h"
19
#include "vnd.h"
S
shm  
Shengliang Guan 已提交
20

21
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
22 23 24 25 26 27
  STfs   *pTfs = pMgmt->pTfs;
  int32_t diskId = 0;
  if (!pTfs) {
    return diskId;
  }

28
  // search fs
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  char vnodePath[TSDB_FILENAME_LEN] = {0};
  snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
  char fname[TSDB_FILENAME_LEN] = {0};
  char fnameTmp[TSDB_FILENAME_LEN] = {0};
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);

  diskId = tfsSearch(pTfs, 0, fname);
  if (diskId >= 0) {
    return diskId;
  }
  diskId = tfsSearch(pTfs, 0, fnameTmp);
  if (diskId >= 0) {
    return diskId;
  }

45
  // alloc
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
  int32_t     disks[TFS_MAX_DISKS_PER_TIER] = {0};
  int32_t     numOfVnodes = 0;
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
  for (int32_t v = 0; v < numOfVnodes; v++) {
    SVnodeObj *pVnode = ppVnodes[v];
    disks[pVnode->diskPrimary] += 1;
  }

  int32_t minVal = INT_MAX;
  int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
  diskId = 0;
  for (int32_t id = 0; id < ndisk; id++) {
    if (minVal > disks[id]) {
      minVal = disks[id];
      diskId = id;
    }
  }

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
  }
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
  }

  dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
  return diskId;
74 75
}

S
Shengliang 已提交
76
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
77 78
  SVnodeObj *pVnode = NULL;

79
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
80
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
81
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
82
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
83
    pVnode = NULL;
S
shm  
Shengliang Guan 已提交
84
  } else {
S
Shengliang Guan 已提交
85
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
86
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
87
  }
88
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
89 90 91 92

  return pVnode;
}

S
Shengliang 已提交
93
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
94 95
  if (pVnode == NULL) return;

96
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
97
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
98
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
99
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
100 101
}

S
Shengliang 已提交
102
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
103
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
104 105 106 107 108 109
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
110
  pVnode->vgVersion = pCfg->vgVersion;
111
  pVnode->diskPrimary = pCfg->diskPrimary;
112
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
113
  pVnode->dropped = 0;
114
  pVnode->path = taosStrdup(pCfg->path);
S
Shengliang Guan 已提交
115
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
116

117
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
118
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
119
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
120 121 122
    return -1;
  }

S
shm  
Shengliang Guan 已提交
123
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
124
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
125 126
    taosMemoryFree(pVnode->path);
    taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
127 128 129
    return -1;
  }

130
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
131
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
132
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
133 134 135 136

  return code;
}

S
Shengliang Guan 已提交
137
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
S
Shengliang Guan 已提交
138
  char path[TSDB_FILENAME_LEN] = {0};
139 140 141 142 143
  bool atExit = true;

  if (vnodeIsLeader(pVnode->pImpl)) {
    vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
  }
H
refact  
Hongze Cheng 已提交
144

145
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
146
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
147
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
148
  vmReleaseVnode(pMgmt, pVnode);
149

S
Shengliang Guan 已提交
150 151 152
  dInfo("vgId:%d, pre close", pVnode->vgId);
  vnodePreClose(pVnode->pImpl);

153
  dInfo("vgId:%d, wait for vnode ref become 0", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
154
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
155

156 157 158
  dInfo("vgId:%d, wait for vnode write queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
        pVnode->pWriteW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pWriteW);
S
Shengliang Guan 已提交
159

160 161 162
  dInfo("vgId:%d, wait for vnode sync queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
        pVnode->pSyncW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncW);
S
Shengliang Guan 已提交
163

164 165 166
  dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
        pVnode->pSyncRdW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pSyncRdW);
S
Shengliang Guan 已提交
167

168 169 170
  dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
        pVnode->pApplyW.queue->threadId);
  tMultiWorkerCleanup(&pVnode->pApplyW);
S
Shengliang Guan 已提交
171

172
  dInfo("vgId:%d, wait for vnode query queue:%p is empty", pVnode->vgId, pVnode->pQueryQ);
S
shm  
Shengliang Guan 已提交
173
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
S
Shengliang Guan 已提交
174

175
  dInfo("vgId:%d, wait for vnode fetch queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
S
Shengliang Guan 已提交
176
        pVnode->pFetchQ->threadId);
S
shm  
Shengliang Guan 已提交
177
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
178

H
Haojun Liao 已提交
179
  tqNotifyClose(pVnode->pImpl->pTq);
180
  dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
S
Shengliang Guan 已提交
181
  while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
S
Shengliang Guan 已提交
182

183
  dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
184

185 186
  dInfo("vgId:%d, post close", pVnode->vgId);
  vnodePostClose(pVnode->pImpl);
187

S
shm  
Shengliang Guan 已提交
188
  vmFreeQueue(pMgmt, pVnode);
S
Shengliang Guan 已提交
189 190

  if (commitAndRemoveWal) {
191
    dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
S
Shengliang Guan 已提交
192
    vnodeSyncCommit(pVnode->pImpl);
193
    vnodeBegin(pVnode->pImpl);
194
    dInfo("vgId:%d, commit data finished", pVnode->vgId);
S
Shengliang Guan 已提交
195 196
  }

S
shm  
Shengliang Guan 已提交
197 198
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
199
  dInfo("vgId:%d, vnode is closed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
200

S
Shengliang Guan 已提交
201 202 203 204 205 206 207
  if (commitAndRemoveWal) {
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
    dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
    tfsRmdir(pMgmt->pTfs, path);
    tfsMkdir(pMgmt->pTfs, path);
  }

S
shm  
Shengliang Guan 已提交
208
  if (pVnode->dropped) {
209
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
210
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
M
Minglei Jin 已提交
211
    vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
212 213
  }

wafwerar's avatar
wafwerar 已提交
214 215
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
216 217
}

218 219 220 221 222 223 224 225 226 227 228
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
  int32_t srcVgId = pCfg->vgId;
  int32_t dstVgId = pCfg->toVgId;
  if (dstVgId == 0) return 0;

  char srcPath[TSDB_FILENAME_LEN];
  char dstPath[TSDB_FILENAME_LEN];

  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);

229 230
  int32_t diskPrimary = pCfg->diskPrimary;
  int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
231 232 233 234 235 236 237 238 239 240
  if (vgId <= 0) {
    dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
    return -1;
  }

  pCfg->vgId = vgId;
  pCfg->toVgId = 0;
  return 0;
}

S
Shengliang Guan 已提交
241
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
242
  SVnodeThread *pThread = param;
S
Shengliang 已提交
243
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
244
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
245

246
  dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
S
shm  
Shengliang Guan 已提交
247 248 249 250 251 252 253 254
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
255 256
    tmsgReportStartup("vnode-open", stepDesc);

257 258 259 260 261 262 263 264 265
    if (pCfg->toVgId) {
      if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
        dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
        pThread->failed++;
        continue;
      }
      pThread->updateVnodesList = true;
    }

266
    int32_t diskPrimary = pCfg->diskPrimary;
H
Hongze Cheng 已提交
267
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
268

269
    SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
270
    if (pImpl == NULL) {
271
      dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
S
shm  
Shengliang Guan 已提交
272
      pThread->failed++;
273
      continue;
S
shm  
Shengliang Guan 已提交
274
    }
275 276 277 278 279 280 281 282 283 284

    if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
      continue;
    }

    dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
    pThread->opened++;
    atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
285 286
  }

287 288
  dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
S
shm  
Shengliang Guan 已提交
289 290 291
  return NULL;
}

S
Shengliang 已提交
292
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
293
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
294 295
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
296
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
297 298 299 300 301
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
302
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
303 304 305 306 307 308
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
309
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
310
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
311 312
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
313
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
314 315 316
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
317
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
318 319 320 321 322 323 324 325
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

S
Shengliang Guan 已提交
326
  dInfo("open %d vnodes with %d threads", numOfVnodes, threadNum);
S
shm  
Shengliang Guan 已提交
327 328 329 330 331

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
332 333 334
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
335
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
336 337 338
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
339
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
340 341
  }

342 343
  bool updateVnodesList = false;

S
shm  
Shengliang Guan 已提交
344 345 346
  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
347
      taosThreadJoin(pThread->thread, NULL);
348
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
349
    }
wafwerar's avatar
wafwerar 已提交
350
    taosMemoryFree(pThread->pCfgs);
351
    if (pThread->updateVnodesList) updateVnodesList = true;
S
shm  
Shengliang Guan 已提交
352
  }
wafwerar's avatar
wafwerar 已提交
353 354
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
355 356 357

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
358
    terrno = TSDB_CODE_VND_INIT_FAILED;
S
shm  
Shengliang Guan 已提交
359 360
    return -1;
  }
361 362 363 364 365 366 367 368

  if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
    dError("failed to write vnode list since %s", terrstr());
    return -1;
  }

  dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
  return 0;
S
shm  
Shengliang Guan 已提交
369 370
}

371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
static void *vmCloseVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("close-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-close", stepDesc);

S
Shengliang Guan 已提交
386
    vmCloseVnode(pMgmt, pVnode, false);
387 388 389 390 391 392
  }

  dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
  return NULL;
}

S
Shengliang 已提交
393
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
394
  dInfo("start to close all vnodes");
395 396
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  dInfo("vnodes mgmt worker is stopped");
S
shm  
Shengliang Guan 已提交
397 398

  int32_t     numOfVnodes = 0;
399
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
400

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
  }

  pMgmt->state.openVnodes = 0;
  dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
    taosMemoryFree(pThread->ppVnodes);
S
shm  
Shengliang Guan 已提交
444
  }
445
  taosMemoryFree(threads);
S
shm  
Shengliang Guan 已提交
446

447 448
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
S
shm  
Shengliang Guan 已提交
449 450 451 452 453 454 455 456 457 458
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
459
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
460 461 462
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
463
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
464
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
465
}
S
shm  
Shengliang Guan 已提交
466

S
Shengliang Guan 已提交
467
static void vmCheckSyncTimeout(SVnodeMgmt *pMgmt) {
468 469
  int32_t     numOfVnodes = 0;
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
Shengliang Guan 已提交
470

471
  if (ppVnodes != NULL) {
S
Shengliang Guan 已提交
472 473 474 475 476
    for (int32_t i = 0; i < numOfVnodes; ++i) {
      SVnodeObj *pVnode = ppVnodes[i];
      vnodeSyncCheckTimeout(pVnode->pImpl);
      vmReleaseVnode(pMgmt, pVnode);
    }
477
    taosMemoryFree(ppVnodes);
S
Shengliang Guan 已提交
478 479
  }
}
S
Shengliang Guan 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492

static void *vmThreadFp(void *param) {
  SVnodeMgmt *pMgmt = param;
  int64_t     lastTime = 0;
  setThreadName("vnode-timer");

  while (1) {
    lastTime++;
    taosMsleep(100);
    if (pMgmt->stop) break;
    if (lastTime % 10 != 0) continue;

    int64_t sec = lastTime / 10;
S
Shengliang Guan 已提交
493
    if (sec % (VNODE_TIMEOUT_SEC / 2) == 0) {
S
Shengliang Guan 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
      vmCheckSyncTimeout(pMgmt);
    }
  }

  return NULL;
}

static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
    dError("failed to create vnode timer thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
  pMgmt->stop = true;
  if (taosCheckPthreadValid(pMgmt->thread)) {
    taosThreadJoin(pMgmt->thread, NULL);
    taosThreadClear(&pMgmt->thread);
  }
}

S
Shengliang Guan 已提交
522
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
523 524 525
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
526 527
  if (pMgmt == NULL) goto _OVER;

528
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
529 530 531
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
532
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
533
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
534
  pMgmt->msgCb.mgmt = pMgmt;
535
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
536

537
  pMgmt->pTfs = pInput->pTfs;
S
shm  
Shengliang Guan 已提交
538
  if (pMgmt->pTfs == NULL) {
539
    dError("tfs is null.");
S
shm  
Shengliang Guan 已提交
540 541
    goto _OVER;
  }
S
Shengliang 已提交
542
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
543

S
shm  
Shengliang Guan 已提交
544 545
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
546
    goto _OVER;
S
shm  
Shengliang Guan 已提交
547
  }
S
Shengliang 已提交
548
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
549

M
Minghao Li 已提交
550 551
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
552
    goto _OVER;
M
Minghao Li 已提交
553
  }
S
Shengliang 已提交
554
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
555

H
Hongze Cheng 已提交
556
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
557
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
558 559
    goto _OVER;
  }
S
Shengliang 已提交
560
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
561 562

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
563 564
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
565
  }
S
Shengliang 已提交
566
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
567 568

  if (vmOpenVnodes(pMgmt) != 0) {
569
    dError("failed to open all vnodes since %s", terrstr());
S
Shengliang 已提交
570
    goto _OVER;
S
shm  
Shengliang Guan 已提交
571
  }
S
Shengliang 已提交
572
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
573

S
slzhou 已提交
574
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
575
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
576
    goto _OVER;
S
slzhou 已提交
577 578
  }

S
shm  
Shengliang Guan 已提交
579 580
  code = 0;

S
shm  
Shengliang Guan 已提交
581 582
_OVER:
  if (code == 0) {
S
Shengliang 已提交
583
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
584 585
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
586
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
587
  }
S
shm  
Shengliang Guan 已提交
588

S
Shengliang 已提交
589
  return code;
S
shm  
Shengliang Guan 已提交
590 591
}

S
Shengliang 已提交
592
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
593
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
594
  return 0;
S
shm  
Shengliang Guan 已提交
595
}
S
shm  
Shengliang Guan 已提交
596

597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
static void *vmRestoreVnodeInThread(void *param) {
  SVnodeThread *pThread = param;
  SVnodeMgmt   *pMgmt = pThread->pMgmt;

  dInfo("thread:%d, start to restore %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("restore-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SVnodeObj *pVnode = pThread->ppVnodes[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been restored", pVnode->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
    tmsgReportStartup("vnode-restore", stepDesc);

    int32_t code = vnodeStart(pVnode->pImpl);
    if (code != 0) {
      dError("vgId:%d, failed to restore vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
617
      dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
618 619 620 621 622 623 624 625 626 627 628
      pThread->opened++;
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
    }
  }

  dInfo("thread:%d, numOfVnodes:%d, restored:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
        pThread->failed);
  return NULL;
}

static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
629
  int32_t     numOfVnodes = 0;
630
  SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
631

632 633 634 635 636 637 638 639 640
  int32_t threadNum = tsNumOfCores / 2;
  if (threadNum < 1) threadNum = 1;
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
    threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
641 642
  }

643 644 645
  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
S
Shengliang Guan 已提交
646
    if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
S
Shengliang Guan 已提交
647 648
      pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
    }
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673
  }

  pMgmt->state.openVnodes = 0;
  dInfo("restore %d vnodes with %d threads", numOfVnodes, threadNum);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
      dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
    }

    taosThreadAttrDestroy(&thAttr);
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      taosThreadJoin(pThread->thread, NULL);
      taosThreadClear(&pThread->thread);
    }
S
Shengliang Guan 已提交
674
    taosMemoryFree(pThread->ppVnodes);
675 676 677
  }
  taosMemoryFree(threads);

678
  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
Shengliang Guan 已提交
679 680
    if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
    vmReleaseVnode(pMgmt, ppVnodes[i]);
681 682
  }

683 684
  if (ppVnodes != NULL) {
    taosMemoryFree(ppVnodes);
685 686
  }

S
Shengliang Guan 已提交
687
  return vmInitTimer(pMgmt);
688 689
}

S
Shengliang Guan 已提交
690
static void vmStop(SVnodeMgmt *pMgmt) { vmCleanupTimer(pMgmt); }
691

S
Shengliang 已提交
692 693 694 695
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
696
  mgmtFunc.startFp = (NodeStartFp)vmStartVnodes;
S
Shengliang 已提交
697 698 699
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
700

S
Shengliang 已提交
701 702
  return mgmtFunc;
}