vmInt.c 10.6 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
24
  if (pVnode == NULL || pVnode->dropped) {
S
shm  
Shengliang Guan 已提交
25 26
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
S
Shengliang Guan 已提交
27
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
28
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
29
  }
30
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
31 32 33 34

  return pVnode;
}

S
Shengliang 已提交
35
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
36 37
  if (pVnode == NULL) return;

38
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
39
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
40
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
41
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
42 43
}

S
Shengliang 已提交
44
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
45
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
46 47 48 49 50 51
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
52
  pVnode->vgVersion = pCfg->vgVersion;
53
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
54 55
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
56
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
57

58
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
59 60 61 62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
63
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
64 65 66 67
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

68
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
69
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
70
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
71 72 73 74

  return code;
}

S
Shengliang 已提交
75
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
76
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
77

78
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
79
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
80
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
81 82 83

  vmReleaseVnode(pMgmt, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
S
Shengliang Guan 已提交
84 85
  dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);

S
shm  
Shengliang Guan 已提交
86 87 88 89 90
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
S
Shengliang Guan 已提交
91
  dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
92

S
shm  
Shengliang Guan 已提交
93
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
94 95 96 97 98
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
99
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
100 101
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
102 103
  }

wafwerar's avatar
wafwerar 已提交
104 105
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
106 107
}

S
Shengliang Guan 已提交
108
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
109
  SVnodeThread *pThread = param;
S
Shengliang 已提交
110
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
111
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
112 113 114 115 116 117 118 119 120 121

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
122 123
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
124
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
125
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
126 127 128 129
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
130
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
131 132
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
133
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
134 135 136
    }
  }

S
Shengliang Guan 已提交
137
  dDebug("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
S
shm  
Shengliang Guan 已提交
138 139 140 141
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
142
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
143
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
S
shm  
Shengliang Guan 已提交
144 145
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
146
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
147 148 149 150 151
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
152
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
153 154 155 156 157 158
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
159
  int32_t threadNum = tsNumOfCores / 2;
S
Shengliang Guan 已提交
160
  if (threadNum < 1) threadNum = 1;
S
shm  
Shengliang Guan 已提交
161 162
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
163
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
164 165 166
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
167
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
182 183 184
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
185
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
186 187 188
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
189
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
190 191 192 193 194
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
195
      taosThreadJoin(pThread->thread, NULL);
196
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
197
    }
wafwerar's avatar
wafwerar 已提交
198
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
199
  }
wafwerar's avatar
wafwerar 已提交
200 201
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
202 203 204 205 206 207 208 209 210 211

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->state.totalVnodes);
    return 0;
  }
}

S
Shengliang 已提交
212
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
213 214 215
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
S
Shengliang Guan 已提交
216
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
217 218

  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
shm  
Shengliang Guan 已提交
219
    vmCloseVnode(pMgmt, pVnodes[i]);
S
shm  
Shengliang Guan 已提交
220 221 222
  }

  if (pVnodes != NULL) {
wafwerar's avatar
wafwerar 已提交
223
    taosMemoryFree(pVnodes);
S
shm  
Shengliang Guan 已提交
224 225 226 227 228 229 230 231 232 233
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
234
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
235 236 237
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
238
  tfsClose(pMgmt->pTfs);
239
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
240
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
241
}
S
shm  
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
244 245 246
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
247 248
  if (pMgmt == NULL) goto _OVER;

249
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
250 251 252
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
253
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
254
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
255
  pMgmt->msgCb.mgmt = pMgmt;
256
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
257 258

  SDiskCfg dCfg = {0};
259
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
260 261
  dCfg.level = 0;
  dCfg.primary = 1;
262 263
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
264 265 266 267 268 269 270 271 272 273
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
274
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
275

S
shm  
Shengliang Guan 已提交
276 277
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
278
    goto _OVER;
S
shm  
Shengliang Guan 已提交
279
  }
S
Shengliang 已提交
280
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
281

M
Minghao Li 已提交
282 283
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
284
    goto _OVER;
M
Minghao Li 已提交
285
  }
S
Shengliang 已提交
286
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
287

H
Hongze Cheng 已提交
288
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
289
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
290 291
    goto _OVER;
  }
S
Shengliang 已提交
292
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
293 294

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
295 296
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
297
  }
S
Shengliang 已提交
298
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
299 300

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
301
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
302
    goto _OVER;
S
shm  
Shengliang Guan 已提交
303
  }
S
Shengliang 已提交
304
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
305

S
slzhou 已提交
306
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
307
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
308
    goto _OVER;
S
slzhou 已提交
309 310
  }

S
shm  
Shengliang Guan 已提交
311 312
  code = 0;

S
shm  
Shengliang Guan 已提交
313 314
_OVER:
  if (code == 0) {
S
Shengliang 已提交
315
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
316 317
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
318
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
319
  }
S
shm  
Shengliang Guan 已提交
320

S
Shengliang 已提交
321
  return code;
S
shm  
Shengliang Guan 已提交
322 323
}

S
Shengliang 已提交
324
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
325
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
326
  return 0;
S
shm  
Shengliang Guan 已提交
327
}
S
shm  
Shengliang Guan 已提交
328

S
Shengliang 已提交
329
static int32_t vmStart(SVnodeMgmt *pMgmt) {
330 331
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
332

333 334
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
335 336 337
    vnodeStart(pVnode->pImpl);
  }

338 339 340 341 342 343 344 345 346
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    vmReleaseVnode(pMgmt, pVnode);
  }

  if (pVnodes != NULL) {
    taosMemoryFree(pVnodes);
  }

347 348 349
  return 0;
}

S
Shengliang 已提交
350
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
351
  // process inside the vnode
352 353
}

S
Shengliang 已提交
354 355 356 357 358 359 360 361
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
  mgmtFunc.startFp = (NodeStartFp)vmStart;
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
362

S
Shengliang 已提交
363 364
  return mgmtFunc;
}