vmInt.c 11.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
  SVnodeObj *pVnode = NULL;
  int32_t    refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

  return pVnode;
}

S
Shengliang 已提交
39
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
40 41 42 43 44 45 46 47
  if (pVnode == NULL) return;

  taosRLockLatch(&pMgmt->latch);
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
  taosRUnLockLatch(&pMgmt->latch);
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}

S
Shengliang 已提交
48
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
49
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
50 51 52 53 54 55 56
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
57
  pVnode->vgVersion = pCfg->vgVersion;
S
shm  
Shengliang Guan 已提交
58 59 60 61 62
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
63
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
64 65 66 67 68 69

  if (pVnode->path == NULL || pVnode->db == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
70
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
  taosWUnLockLatch(&pMgmt->latch);

  return code;
}

S
Shengliang 已提交
82
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
83
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
84

S
shm  
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  vmReleaseVnode(pMgmt, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
96
  while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10);
S
shm  
Shengliang Guan 已提交
97

S
shm  
Shengliang Guan 已提交
98
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
99 100 101 102 103 104 105
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
106 107
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
108 109
  }

wafwerar's avatar
wafwerar 已提交
110 111 112
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode->db);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
113 114
}

S
Shengliang Guan 已提交
115
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
116
  SVnodeThread *pThread = param;
S
Shengliang 已提交
117
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
118
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
119 120 121 122 123 124 125 126 127 128

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
129 130
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
131
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
132
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
133 134 135 136
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
137
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
138 139
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
140
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
141 142 143 144 145 146 147 148
    }
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
149
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
150 151 152
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
153
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
154 155 156 157 158
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
159
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
160 161 162 163 164 165
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
166
  int32_t threadNum = 1;  // tsNumOfCores;
S
shm  
Shengliang Guan 已提交
167 168
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
169
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
170 171 172
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
173
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
188 189 190
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
191
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
192 193 194
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
195
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
196 197 198 199 200
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
201
      taosThreadJoin(pThread->thread, NULL);
S
shm  
Shengliang Guan 已提交
202
    }
wafwerar's avatar
wafwerar 已提交
203
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
204
  }
wafwerar's avatar
wafwerar 已提交
205 206
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
207 208 209 210 211 212 213 214 215 216

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->state.totalVnodes);
    return 0;
  }
}

S
Shengliang 已提交
217
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
218 219 220
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
S
Shengliang Guan 已提交
221
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
222 223

  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
shm  
Shengliang Guan 已提交
224
    vmCloseVnode(pMgmt, pVnodes[i]);
S
shm  
Shengliang Guan 已提交
225 226 227
  }

  if (pVnodes != NULL) {
wafwerar's avatar
wafwerar 已提交
228
    taosMemoryFree(pVnodes);
S
shm  
Shengliang Guan 已提交
229 230 231 232 233 234 235 236 237 238
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
239
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
240
  dInfo("vnode-mgmt start to cleanup");
S
shm  
Shengliang Guan 已提交
241 242 243
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
244
  tfsClose(pMgmt->pTfs);
wafwerar's avatar
wafwerar 已提交
245
  taosMemoryFree(pMgmt);
246

S
Shengliang Guan 已提交
247
  dInfo("vnode-mgmt is cleaned up");
S
shm  
Shengliang Guan 已提交
248
}
S
shm  
Shengliang Guan 已提交
249

S
Shengliang 已提交
250
static int32_t vmInit(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang Guan 已提交
251
  dInfo("vnode-mgmt start to init");
S
Shengliang 已提交
252 253 254
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
255 256
  if (pMgmt == NULL) goto _OVER;

S
Shengliang 已提交
257 258 259 260 261 262 263 264 265 266 267 268
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->dnodeId = pInput->dnodeId;
  pMgmt->msgCb = pInput->msgCb;
  pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue;
  pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
  pMgmt->msgCb.queueFps[APPLY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToApplyQueue;
  pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToQueryQueue;
  pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue;
  pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
  pMgmt->msgCb.pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
269 270 271
  taosInitRWLatch(&pMgmt->latch);

  SDiskCfg dCfg = {0};
S
Shengliang 已提交
272
  tstrncpy(dCfg.dir, pInput->dataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
273 274
  dCfg.level = 0;
  dCfg.primary = 1;
S
Shengliang 已提交
275 276
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
277 278 279 280 281 282 283 284 285 286
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
287
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
288

S
shm  
Shengliang Guan 已提交
289 290
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
291
    goto _OVER;
S
shm  
Shengliang Guan 已提交
292
  }
S
Shengliang 已提交
293
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
294

M
Minghao Li 已提交
295 296
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
297
    goto _OVER;
M
Minghao Li 已提交
298
  }
S
Shengliang 已提交
299
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
300

H
Hongze Cheng 已提交
301
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
302
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
303 304
    goto _OVER;
  }
S
Shengliang 已提交
305
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
306 307

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
308 309
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
310
  }
S
Shengliang 已提交
311
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
312 313

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
314
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
315
    goto _OVER;
S
shm  
Shengliang Guan 已提交
316
  }
S
Shengliang 已提交
317
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
318

S
slzhou 已提交
319
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
320
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
321
    goto _OVER;
S
slzhou 已提交
322 323
  }

S
shm  
Shengliang Guan 已提交
324 325
  code = 0;

S
shm  
Shengliang Guan 已提交
326 327
_OVER:
  if (code == 0) {
S
Shengliang 已提交
328
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
329 330 331
    dInfo("vnodes-mgmt is initialized");
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
332
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
333
  }
S
shm  
Shengliang Guan 已提交
334

S
Shengliang 已提交
335
  return code;
S
shm  
Shengliang Guan 已提交
336 337
}

S
Shengliang 已提交
338 339
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
  *required = pInput->supportVnodes > 0;
S
shm  
Shengliang Guan 已提交
340
  return 0;
S
shm  
Shengliang Guan 已提交
341
}
S
shm  
Shengliang Guan 已提交
342

S
Shengliang 已提交
343
static int32_t vmStart(SVnodeMgmt *pMgmt) {
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
  dDebug("vnode-mgmt start to run");
  taosRLockLatch(&pMgmt->latch);

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

    SVnodeObj *pVnode = *ppVnode;
    vnodeStart(pVnode->pImpl);
    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
  return 0;
}

S
Shengliang 已提交
361
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
362
  // process inside the vnode
363 364
}

S
Shengliang 已提交
365 366 367 368 369 370 371 372
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
  mgmtFunc.startFp = (NodeStartFp)vmStart;
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
373

S
Shengliang 已提交
374 375
  return mgmtFunc;
}