vmInt.c 11.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
  SVnodeObj *pVnode = NULL;
  int32_t    refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

  return pVnode;
}

S
Shengliang 已提交
39
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
40 41 42 43 44 45 46 47
  if (pVnode == NULL) return;

  taosRLockLatch(&pMgmt->latch);
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  taosRUnLockLatch(&pMgmt->latch);
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}

S
Shengliang 已提交
48
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
49
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
50 51 52 53 54 55 56
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
57
  pVnode->vgVersion = pCfg->vgVersion;
S
shm  
Shengliang Guan 已提交
58 59 60
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
61
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
62

63
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
64 65 66 67
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
68
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74 75 76 77 78 79
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
  taosWUnLockLatch(&pMgmt->latch);

  return code;
}

S
Shengliang 已提交
80
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
81
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
82

S
shm  
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  vmReleaseVnode(pMgmt, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
94
  while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10);
S
shm  
Shengliang Guan 已提交
95

S
shm  
Shengliang Guan 已提交
96
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
97 98 99 100 101 102 103
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
104 105
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
106 107
  }

wafwerar's avatar
wafwerar 已提交
108 109
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
110 111
}

S
Shengliang Guan 已提交
112
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
113
  SVnodeThread *pThread = param;
S
Shengliang 已提交
114
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
115
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124 125

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
126 127
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
128
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
129
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
130 131 132 133
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
134
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
135 136
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
137
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
138 139 140 141 142 143 144 145
    }
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
146
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
147 148 149
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
150
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
151 152 153 154 155
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
156
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
157 158 159 160 161 162
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
163
  int32_t threadNum = 1;  // tsNumOfCores;
S
shm  
Shengliang Guan 已提交
164 165
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
166
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
167 168 169
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
170
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
185 186 187
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
188
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
189 190 191
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
192
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
193 194 195 196 197
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
198
      taosThreadJoin(pThread->thread, NULL);
S
shm  
Shengliang Guan 已提交
199
    }
wafwerar's avatar
wafwerar 已提交
200
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
201
  }
wafwerar's avatar
wafwerar 已提交
202 203
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
204 205 206 207 208 209 210 211 212 213

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->state.totalVnodes);
    return 0;
  }
}

S
Shengliang 已提交
214
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
215 216 217
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
S
Shengliang Guan 已提交
218
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
219 220

  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
shm  
Shengliang Guan 已提交
221
    vmCloseVnode(pMgmt, pVnodes[i]);
S
shm  
Shengliang Guan 已提交
222 223 224
  }

  if (pVnodes != NULL) {
wafwerar's avatar
wafwerar 已提交
225
    taosMemoryFree(pVnodes);
S
shm  
Shengliang Guan 已提交
226 227 228 229 230 231 232 233 234 235
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
236
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
237
  dInfo("vnode-mgmt start to cleanup");
S
shm  
Shengliang Guan 已提交
238 239 240
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
241
  tfsClose(pMgmt->pTfs);
wafwerar's avatar
wafwerar 已提交
242
  taosMemoryFree(pMgmt);
243

S
Shengliang Guan 已提交
244
  dInfo("vnode-mgmt is cleaned up");
S
shm  
Shengliang Guan 已提交
245
}
S
shm  
Shengliang Guan 已提交
246

S
Shengliang 已提交
247
static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang Guan 已提交
248
  dInfo("vnode-mgmt start to init");
S
Shengliang 已提交
249 250 251
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
252 253
  if (pMgmt == NULL) goto _OVER;

S
Shengliang 已提交
254 255 256 257 258 259 260 261 262 263 264 265
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->dnodeId = pInput->dnodeId;
  pMgmt->msgCb = pInput->msgCb;
  pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue;
  pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
  pMgmt->msgCb.queueFps[APPLY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToApplyQueue;
  pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToQueryQueue;
  pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue;
  pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
  pMgmt->msgCb.pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
266 267 268
  taosInitRWLatch(&pMgmt->latch);

  SDiskCfg dCfg = {0};
S
Shengliang 已提交
269
  tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
270 271
  dCfg.level = 0;
  dCfg.primary = 1;
S
Shengliang 已提交
272 273
  SDiskCfg *pDisks = pInput->disks;
  int32_t   numOfDisks = pInput->numOfDisks;
S
shm  
Shengliang Guan 已提交
274 275 276 277 278 279 280 281 282 283
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
284
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
285

S
shm  
Shengliang Guan 已提交
286 287
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
288
    goto _OVER;
S
shm  
Shengliang Guan 已提交
289
  }
S
Shengliang 已提交
290
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
291

M
Minghao Li 已提交
292 293
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
294
    goto _OVER;
M
Minghao Li 已提交
295
  }
S
Shengliang 已提交
296
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
297

H
Hongze Cheng 已提交
298
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
299
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
300 301
    goto _OVER;
  }
S
Shengliang 已提交
302
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
303 304

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
305 306
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
307
  }
S
Shengliang 已提交
308
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
309 310

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
311
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
312
    goto _OVER;
S
shm  
Shengliang Guan 已提交
313
  }
S
Shengliang 已提交
314
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
315

S
slzhou 已提交
316
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
317
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
318
    goto _OVER;
S
slzhou 已提交
319 320
  }

S
shm  
Shengliang Guan 已提交
321 322
  code = 0;

S
shm  
Shengliang Guan 已提交
323 324
_OVER:
  if (code == 0) {
S
Shengliang 已提交
325
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
326 327 328
    dInfo("vnodes-mgmt is initialized");
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
329
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
330
  }
S
shm  
Shengliang Guan 已提交
331

S
Shengliang 已提交
332
  return code;
S
shm  
Shengliang Guan 已提交
333 334
}

S
Shengliang 已提交
335 336
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
  *required = pInput->supportVnodes > 0;
S
shm  
Shengliang Guan 已提交
337
  return 0;
S
shm  
Shengliang Guan 已提交
338
}
S
shm  
Shengliang Guan 已提交
339

S
Shengliang 已提交
340
static int32_t vmStart(SVnodeMgmt *pMgmt) {
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  dDebug("vnode-mgmt start to run");
  taosRLockLatch(&pMgmt->latch);

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

    SVnodeObj *pVnode = *ppVnode;
    vnodeStart(pVnode->pImpl);
    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
  return 0;
}

S
Shengliang 已提交
358
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
359
  // process inside the vnode
360 361
}

S
Shengliang 已提交
362 363 364 365 366 367 368 369
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
  mgmtFunc.startFp = (NodeStartFp)vmStart;
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
370

S
Shengliang 已提交
371 372
  return mgmtFunc;
}