vmInt.c 10.4 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang 已提交
19
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
S
shm  
Shengliang Guan 已提交
20 21
  SVnodeObj *pVnode = NULL;

22
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
23 24 25 26
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
S
Shengliang Guan 已提交
27
    int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
28
    // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
S
shm  
Shengliang Guan 已提交
29
  }
30
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
31 32 33 34

  return pVnode;
}

S
Shengliang 已提交
35
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
36 37
  if (pVnode == NULL) return;

38
  taosThreadRwlockRdlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
39
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
40
  // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
41
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
42 43
}

S
Shengliang 已提交
44
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
wafwerar's avatar
wafwerar 已提交
45
  SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
S
shm  
Shengliang Guan 已提交
46 47 48 49 50 51
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
52
  pVnode->vgVersion = pCfg->vgVersion;
53
  pVnode->refCount = 0;
S
shm  
Shengliang Guan 已提交
54 55
  pVnode->dropped = 0;
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
56
  pVnode->pImpl = pImpl;
S
shm  
Shengliang Guan 已提交
57

58
  if (pVnode->path == NULL) {
S
shm  
Shengliang Guan 已提交
59 60 61 62
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
63
  if (vmAllocQueue(pMgmt, pVnode) != 0) {
S
shm  
Shengliang Guan 已提交
64 65 66 67
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

68
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
69
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
70
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
71 72 73 74

  return code;
}

S
Shengliang 已提交
75
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
76
  char path[TSDB_FILENAME_LEN] = {0};
H
refact  
Hongze Cheng 已提交
77

78
  taosThreadRwlockWrlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
79
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
80
  taosThreadRwlockUnlock(&pMgmt->lock);
S
shm  
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89

  vmReleaseVnode(pMgmt, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
shm  
Shengliang Guan 已提交
90
  vmFreeQueue(pMgmt, pVnode);
S
shm  
Shengliang Guan 已提交
91 92 93 94 95 96
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

  if (pVnode->dropped) {
97
    dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
H
refact  
Hongze Cheng 已提交
98 99
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
100 101
  }

wafwerar's avatar
wafwerar 已提交
102 103
  taosMemoryFree(pVnode->path);
  taosMemoryFree(pVnode);
S
shm  
Shengliang Guan 已提交
104 105
}

S
Shengliang Guan 已提交
106
static void *vmOpenVnodeInThread(void *param) {
S
shm  
Shengliang Guan 已提交
107
  SVnodeThread *pThread = param;
S
Shengliang 已提交
108
  SVnodeMgmt   *pMgmt = pThread->pMgmt;
H
Hongze Cheng 已提交
109
  char          path[TSDB_FILENAME_LEN];
S
shm  
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118 119

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
    SWrapperCfg *pCfg = &pThread->pCfgs[v];

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
             pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
S
Shengliang 已提交
120 121
    tmsgReportStartup("vnode-open", stepDesc);

H
Hongze Cheng 已提交
122
    snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
S
Shengliang 已提交
123
    SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
124 125 126 127
    if (pImpl == NULL) {
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->failed++;
    } else {
S
shm  
Shengliang Guan 已提交
128
      vmOpenVnode(pMgmt, pCfg, pImpl);
S
shm  
Shengliang Guan 已提交
129 130
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
      pThread->opened++;
S
Shengliang Guan 已提交
131
      atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
S
shm  
Shengliang Guan 已提交
132 133 134
    }
  }

S
Shengliang Guan 已提交
135
  dDebug("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
S
shm  
Shengliang Guan 已提交
136 137 138 139
         pThread->failed);
  return NULL;
}

S
Shengliang 已提交
140
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
141 142 143
  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
144
    dError("failed to init vnode hash since %s", terrstr());
S
shm  
Shengliang Guan 已提交
145 146 147 148 149
    return -1;
  }

  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
S
Shengliang Guan 已提交
150
  if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) {
S
shm  
Shengliang Guan 已提交
151 152 153 154 155 156
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->state.totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
157
  int32_t threadNum = 1;
S
shm  
Shengliang Guan 已提交
158 159
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

wafwerar's avatar
wafwerar 已提交
160
  SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
S
shm  
Shengliang Guan 已提交
161 162 163
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
    threads[t].pMgmt = pMgmt;
wafwerar's avatar
wafwerar 已提交
164
    threads[t].pCfgs = taosMemoryCalloc(vnodesPerThread, sizeof(SWrapperCfg));
S
shm  
Shengliang Guan 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

S
Shengliang Guan 已提交
179 180 181
    TdThreadAttr thAttr;
    taosThreadAttrInit(&thAttr);
    taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
182
    if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
S
shm  
Shengliang Guan 已提交
183 184 185
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }

S
Shengliang Guan 已提交
186
    taosThreadAttrDestroy(&thAttr);
S
shm  
Shengliang Guan 已提交
187 188 189 190 191
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
S
Shengliang Guan 已提交
192
      taosThreadJoin(pThread->thread, NULL);
193
      taosThreadClear(&pThread->thread);
S
shm  
Shengliang Guan 已提交
194
    }
wafwerar's avatar
wafwerar 已提交
195
    taosMemoryFree(pThread->pCfgs);
S
shm  
Shengliang Guan 已提交
196
  }
wafwerar's avatar
wafwerar 已提交
197 198
  taosMemoryFree(threads);
  taosMemoryFree(pCfgs);
S
shm  
Shengliang Guan 已提交
199 200 201 202 203 204 205 206 207 208

  if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->state.totalVnodes);
    return 0;
  }
}

S
Shengliang 已提交
209
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
210 211 212
  dInfo("start to close all vnodes");

  int32_t     numOfVnodes = 0;
S
Shengliang Guan 已提交
213
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
S
shm  
Shengliang Guan 已提交
214 215

  for (int32_t i = 0; i < numOfVnodes; ++i) {
S
shm  
Shengliang Guan 已提交
216
    vmCloseVnode(pMgmt, pVnodes[i]);
S
shm  
Shengliang Guan 已提交
217 218 219
  }

  if (pVnodes != NULL) {
wafwerar's avatar
wafwerar 已提交
220
    taosMemoryFree(pVnodes);
S
shm  
Shengliang Guan 已提交
221 222 223 224 225 226 227 228 229 230
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang 已提交
231
static void vmCleanup(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
232 233 234
  vmCloseVnodes(pMgmt);
  vmStopWorker(pMgmt);
  vnodeCleanup();
235
  tfsClose(pMgmt->pTfs);
236
  taosThreadRwlockDestroy(&pMgmt->lock);
wafwerar's avatar
wafwerar 已提交
237
  taosMemoryFree(pMgmt);
S
shm  
Shengliang Guan 已提交
238
}
S
shm  
Shengliang Guan 已提交
239

S
Shengliang Guan 已提交
240
static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
S
Shengliang 已提交
241 242 243
  int32_t code = -1;

  SVnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SVnodeMgmt));
S
shm  
Shengliang Guan 已提交
244 245
  if (pMgmt == NULL) goto _OVER;

246
  pMgmt->pData = pInput->pData;
S
Shengliang 已提交
247 248 249
  pMgmt->path = pInput->path;
  pMgmt->name = pInput->name;
  pMgmt->msgCb = pInput->msgCb;
S
Shengliang Guan 已提交
250
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
S
Shengliang 已提交
251
  pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
252
  pMgmt->msgCb.mgmt = pMgmt;
253
  taosThreadRwlockInit(&pMgmt->lock, NULL);
S
shm  
Shengliang Guan 已提交
254 255

  SDiskCfg dCfg = {0};
256
  tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
S
shm  
Shengliang Guan 已提交
257 258
  dCfg.level = 0;
  dCfg.primary = 1;
259 260
  SDiskCfg *pDisks = tsDiskCfg;
  int32_t   numOfDisks = tsDiskCfgNum;
S
shm  
Shengliang Guan 已提交
261 262 263 264 265 266 267 268 269 270
  if (numOfDisks <= 0 || pDisks == NULL) {
    pDisks = &dCfg;
    numOfDisks = 1;
  }

  pMgmt->pTfs = tfsOpen(pDisks, numOfDisks);
  if (pMgmt->pTfs == NULL) {
    dError("failed to init tfs since %s", terrstr());
    goto _OVER;
  }
S
Shengliang 已提交
271
  tmsgReportStartup("vnode-tfs", "initialized");
S
shm  
Shengliang Guan 已提交
272

S
shm  
Shengliang Guan 已提交
273 274
  if (walInit() != 0) {
    dError("failed to init wal since %s", terrstr());
S
shm  
Shengliang Guan 已提交
275
    goto _OVER;
S
shm  
Shengliang Guan 已提交
276
  }
S
Shengliang 已提交
277
  tmsgReportStartup("vnode-wal", "initialized");
S
shm  
Shengliang Guan 已提交
278

M
Minghao Li 已提交
279 280
  if (syncInit() != 0) {
    dError("failed to open sync since %s", terrstr());
S
Shengliang 已提交
281
    goto _OVER;
M
Minghao Li 已提交
282
  }
S
Shengliang 已提交
283
  tmsgReportStartup("vnode-sync", "initialized");
M
Minghao Li 已提交
284

H
Hongze Cheng 已提交
285
  if (vnodeInit(tsNumOfCommitThreads) != 0) {
S
shm  
Shengliang Guan 已提交
286
    dError("failed to init vnode since %s", terrstr());
S
shm  
Shengliang Guan 已提交
287 288
    goto _OVER;
  }
S
Shengliang 已提交
289
  tmsgReportStartup("vnode-commit", "initialized");
S
shm  
Shengliang Guan 已提交
290 291

  if (vmStartWorker(pMgmt) != 0) {
S
Shengliang 已提交
292 293
    dError("failed to init workers since %s", terrstr());
    goto _OVER;
S
shm  
Shengliang Guan 已提交
294
  }
S
Shengliang 已提交
295
  tmsgReportStartup("vnode-worker", "initialized");
S
shm  
Shengliang Guan 已提交
296 297

  if (vmOpenVnodes(pMgmt) != 0) {
S
Shengliang Guan 已提交
298
    dError("failed to open vnode since %s", terrstr());
S
Shengliang 已提交
299
    goto _OVER;
S
shm  
Shengliang Guan 已提交
300
  }
S
Shengliang 已提交
301
  tmsgReportStartup("vnode-vnodes", "initialized");
S
shm  
Shengliang Guan 已提交
302

S
slzhou 已提交
303
  if (udfcOpen() != 0) {
S
shenglian zhou 已提交
304
    dError("failed to open udfc in vnode");
S
Shengliang 已提交
305
    goto _OVER;
S
slzhou 已提交
306 307
  }

S
shm  
Shengliang Guan 已提交
308 309
  code = 0;

S
shm  
Shengliang Guan 已提交
310 311
_OVER:
  if (code == 0) {
S
Shengliang 已提交
312
    pOutput->pMgmt = pMgmt;
S
shm  
Shengliang Guan 已提交
313 314
  } else {
    dError("failed to init vnodes-mgmt since %s", terrstr());
S
Shengliang 已提交
315
    vmCleanup(pMgmt);
S
shm  
Shengliang Guan 已提交
316
  }
S
shm  
Shengliang Guan 已提交
317

S
Shengliang 已提交
318
  return code;
S
shm  
Shengliang Guan 已提交
319 320
}

S
Shengliang 已提交
321
static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
322
  *required = tsNumOfSupportVnodes > 0;
S
shm  
Shengliang Guan 已提交
323
  return 0;
S
shm  
Shengliang Guan 已提交
324
}
S
shm  
Shengliang Guan 已提交
325

S
Shengliang 已提交
326
static int32_t vmStart(SVnodeMgmt *pMgmt) {
327 328
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
329

330 331
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
332 333 334
    vnodeStart(pVnode->pImpl);
  }

335 336 337 338 339 340 341 342 343
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    vmReleaseVnode(pMgmt, pVnode);
  }

  if (pVnodes != NULL) {
    taosMemoryFree(pVnodes);
  }

344 345 346
  return 0;
}

S
Shengliang 已提交
347
static void vmStop(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
348
  // process inside the vnode
349 350
}

S
Shengliang 已提交
351 352 353 354 355 356 357 358
SMgmtFunc vmGetMgmtFunc() {
  SMgmtFunc mgmtFunc = {0};
  mgmtFunc.openFp = vmInit;
  mgmtFunc.closeFp = (NodeCloseFp)vmCleanup;
  mgmtFunc.startFp = (NodeStartFp)vmStart;
  mgmtFunc.stopFp = (NodeStopFp)vmStop;
  mgmtFunc.requiredFp = vmRequire;
  mgmtFunc.getHandlesFp = vmGetMsgHandles;
S
shm  
Shengliang Guan 已提交
359

S
Shengliang 已提交
360 361
  return mgmtFunc;
}