dndVnodes.c 31.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"

20 21 22 23 24
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
25
  char     db[TSDB_DB_FNAME_LEN];
26 27 28
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
29
typedef struct {
30 31 32 33 34 35
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
H
more  
Hongze Cheng 已提交
36 37 38
  char *      db;
  char *      path;
  SVnode *    pImpl;
39 40 41 42
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
43
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
44 45 46
} SVnodeObj;

typedef struct {
47 48 49 50
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
51
  pthread_t    thread;
H
more  
Hongze Cheng 已提交
52
  SDnode *     pDnode;
53
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
54 55
} SVnodeThread;

S
Shengliang Guan 已提交
56 57
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
58

59 60
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
61 62 63
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
64 65 66 67
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
68
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
69

H
more  
Hongze Cheng 已提交
70
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
71
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
72 73
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
74
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
75
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
76 77 78 79 80 81 82
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
83
  SVnodeObj *  pVnode = NULL;
S
Shengliang Guan 已提交
84 85 86 87 88 89 90 91 92 93 94
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

95 96 97 98
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
99 100 101 102
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
103 104
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
105 106
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
107
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
108
  taosRUnLockLatch(&pMgmt->latch);
109
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
110 111
}

112
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
113
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
114
  SVnodeObj *  pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
115 116 117 118 119
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

120
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
121
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
122 123 124
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
125 126 127 128
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
129

130
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
131 132 133 134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
135 136
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
137 138 139 140
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
141
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
142 143 144 145 146 147 148 149
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

150
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
164
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
165 166 167
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

168 169
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

S
Shengliang Guan 已提交
170 171 172 173 174
  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
    vnodeDestroy(pVnode->path);
  }

175 176 177
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
H
more  
Hongze Cheng 已提交
191
    SVnodeObj * pVnode = *ppVnode;
192 193 194 195
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
196
      num++;
197 198 199
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
200 201 202 203 204 205 206 207 208
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

209 210 211 212
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
H
more  
Hongze Cheng 已提交
213 214 215
  char *       content = calloc(1, maxLen + 1);
  cJSON *      root = NULL;
  FILE *       fp = NULL;
216 217
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
218 219 220 221

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  fp = fopen(file, "r");
S
Shengliang Guan 已提交
222
  if (fp == NULL) {
S
Shengliang Guan 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
248 249 250 251
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
252 253 254
      goto PRASE_VNODE_OVER;
    }

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    for (int32_t i = 0; i < vnodesNum; ++i) {
      cJSON       *vnode = cJSON_GetArrayItem(vnodes, i);
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
294
    }
295

296
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
297 298
  }

299
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  FILE *fp = fopen(file, "w");
318
  if (fp == NULL) {
S
Shengliang Guan 已提交
319 320 321 322 323 324 325
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

326 327
  int32_t len = 0;
  int32_t maxLen = 65536;
H
more  
Hongze Cheng 已提交
328
  char *  content = calloc(1, maxLen + 1);
329

S
Shengliang Guan 已提交
330
  len += snprintf(content + len, maxLen - len, "{\n");
331
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
332 333
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
334 335 336 337 338 339
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
340
    if (i < numOfVnodes - 1) {
341
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
342
    } else {
343
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
344 345
    }
  }
346
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
364
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
365 366 367 368 369
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
H
more  
Hongze Cheng 已提交
370 371
  SDnode *      pDnode = pThread->pDnode;
  SVnodesMgmt * pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
372 373 374 375 376

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
377
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
378 379

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
380
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
381 382 383
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

H
more  
Hongze Cheng 已提交
384
    SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId);
S
Shengliang Guan 已提交
385
    if (pImpl == NULL) {
386
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
387 388
      pThread->failed++;
    } else {
389
      dndOpenVnode(pDnode, pCfg, pImpl);
390
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
409
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
410 411 412
    return -1;
  }

413 414 415
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
416 417 418 419 420 421
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

422
  int32_t threadNum = pDnode->opt.numOfCores;
S
Shengliang Guan 已提交
423 424 425 426 427
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
428
    threads[t].pDnode = pDnode;
429
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
430 431 432 433 434
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
435
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
436 437 438 439 440 441 442 443
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

444 445 446 447
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
448 449
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
450 451

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
452 453 454 455
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
456 457 458
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
459
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
460 461
  }
  free(threads);
462
  free(pCfgs);
S
Shengliang Guan 已提交
463 464 465 466 467 468 469 470 471 472 473

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
474
  dInfo("start to close all vnodes");
S
Shengliang Guan 已提交
475 476 477 478 479 480
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
481
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
482 483 484 485 486 487 488 489 490 491 492 493 494 495
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
496 497
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = pReq->pCont;
498 499 500
  pCreate->vgId = htonl(pCreate->vgId);
  pCreate->dnodeId = htonl(pCreate->dnodeId);
  pCreate->dbUid = htobe64(pCreate->dbUid);
501
  pCreate->vgVersion = htonl(pCreate->vgVersion);
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
  pCreate->totalBlocks = htonl(pCreate->totalBlocks);
  pCreate->daysPerFile = htonl(pCreate->daysPerFile);
  pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
  pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
  pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
  pCreate->minRows = htonl(pCreate->minRows);
  pCreate->maxRows = htonl(pCreate->maxRows);
  pCreate->commitTime = htonl(pCreate->commitTime);
  pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
  for (int r = 0; r < pCreate->replica; ++r) {
    SReplica *pReplica = &pCreate->replicas[r];
    pReplica->id = htonl(pReplica->id);
    pReplica->port = htons(pReplica->port);
  }

518 519
  return pCreate;
}
S
Shengliang Guan 已提交
520

S
Shengliang Guan 已提交
521
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
522
  pCfg->vgId = pCreate->vgId;
523 524 525 526 527 528 529 530
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
531
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
532 533 534 535 536 537 538 539 540 541 542
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
543 544
}

S
Shengliang Guan 已提交
545
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
546
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
547 548 549 550 551
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
552 553
}

554
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
555
  SDropVnodeReq *pDrop = pReq->pCont;
S
Shengliang Guan 已提交
556 557 558 559
  pDrop->vgId = htonl(pDrop->vgId);
  return pDrop;
}

560
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
561
  SAuthVnodeReq *pAuth = pReq->pCont;
S
Shengliang Guan 已提交
562 563 564 565
  pAuth->vgId = htonl(pAuth->vgId);
  return pAuth;
}

S
Shengliang Guan 已提交
566 567
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
568 569
  dDebug("vgId:%d, create vnode req is received", pCreate->vgId);

S
Shengliang Guan 已提交
570
  SVnodeCfg vnodeCfg = {0};
571
  dndGenerateVnodeCfg(pCreate, &vnodeCfg);
S
Shengliang Guan 已提交
572

573 574
  SWrapperCfg wrapperCfg = {0};
  dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
S
Shengliang Guan 已提交
575

576
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
S
Shengliang Guan 已提交
577
  if (pVnode != NULL) {
578
    dDebug("vgId:%d, already exist", pCreate->vgId);
S
Shengliang Guan 已提交
579
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
580
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
581
    return -1;
S
Shengliang Guan 已提交
582 583
  }

H
more  
Hongze Cheng 已提交
584
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId);
585
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
586
    dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
587 588 589
    return -1;
  }

590
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
591
  if (code != 0) {
S
Shengliang Guan 已提交
592
    dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
593 594 595 596 597 598 599 600 601 602 603 604
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
605 606 607 608 609
  }

  return 0;
}

S
Shengliang Guan 已提交
610 611
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
612
  dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
S
Shengliang Guan 已提交
613

614 615
  SVnodeCfg vnodeCfg = {0};
  dndGenerateVnodeCfg(pAlter, &vnodeCfg);
S
Shengliang Guan 已提交
616

617
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
S
Shengliang Guan 已提交
618
  if (pVnode == NULL) {
619
    dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
620
    return -1;
S
Shengliang Guan 已提交
621 622
  }

S
Shengliang Guan 已提交
623
  if (pAlter->vgVersion == pVnode->vgVersion) {
624 625 626 627 628
    dndReleaseVnode(pDnode, pVnode);
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
    return 0;
  }

S
Shengliang Guan 已提交
629
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
630
    dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
631
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
632
    return -1;
S
Shengliang Guan 已提交
633 634
  }

635
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
636
  pVnode->vgVersion = pAlter->vgVersion;
637 638 639 640 641
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
642
  dndReleaseVnode(pDnode, pVnode);
643
  return code;
S
Shengliang Guan 已提交
644 645
}

S
Shengliang Guan 已提交
646
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
647
  SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
648 649 650 651 652 653 654

  int32_t vgId = pDrop->vgId;
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
655 656
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
657 658
  }

659 660 661
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
662 663
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
664 665
  }

666
  dndCloseVnode(pDnode, pVnode);
667 668
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
669 670 671
  return 0;
}

S
Shengliang Guan 已提交
672
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
673
  SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
S
Shengliang Guan 已提交
674 675 676 677 678 679 680

  int32_t vgId = pAuth->vgId;
  dDebug("vgId:%d, auth vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
S
Shengliang Guan 已提交
681
    return -1;
S
Shengliang Guan 已提交
682 683 684 685 686 687 688
  }

  pVnode->accessState = pAuth->accessState;
  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
689
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
690
  SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
691

S
Shengliang Guan 已提交
692 693
  int32_t vgId = pSync->vgId;
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
694 695 696

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
697 698
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
699 700 701
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
702
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
703
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
704
    return -1;
S
Shengliang Guan 已提交
705 706 707 708 709 710
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
711
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
712
  SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
713 714 715 716 717 718 719

  int32_t vgId = pCompact->vgId;
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
720
    return -1;
S
Shengliang Guan 已提交
721 722 723 724 725
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
726
    return -1;
S
Shengliang Guan 已提交
727 728 729 730 731 732
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

733 734 735
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
  SRpcMsg *pRsp = NULL;
  vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
736 737
}

738 739 740
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
  SRpcMsg *pRsp = NULL;
  vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
741 742
}

743
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
744
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
745 746

  for (int32_t i = 0; i < numOfMsgs; ++i) {
747 748 749
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
750
    assert(ptr != NULL);
S
Shengliang Guan 已提交
751 752
  }

H
more  
Hongze Cheng 已提交
753
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
754 755 756 757

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
758
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
759
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
760
      pRsp->ahandle = pMsg->ahandle;
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
777 778
}

779
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
780 781
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
782 783
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
784

S
Shengliang Guan 已提交
785
    // todo
786 787
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
788 789 790
  }
}

791
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
792 793
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
794 795
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
796

S
Shengliang Guan 已提交
797
    // todo
798 799
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
800 801 802
  }
}

803
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
821 822 823 824
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
825 826 827 828 829
    rpcFreeCont(pRpcMsg->pCont);
  }
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
830
  SMsgHead *pHead = pMsg->pCont;
831
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
832 833 834 835
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
836 837 838 839
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
    dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg);
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
857
    dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
858 859 860 861 862 863 864
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
865
    dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
866 867 868 869 870 871 872
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
873
    dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
874 875 876 877
    dndReleaseVnode(pDnode, pVnode);
  }
}

878
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
879
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
880
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
881 882 883 884 885 886

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
887
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
888 889 890
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
S
Shengliang Guan 已提交
891 892 893 894 895
  int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores);
  int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1);
  int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1);
S
Shengliang Guan 已提交
896 897 898

  SWorkerPool *pPool = &pMgmt->queryPool;
  pPool->name = "vnode-query";
S
Shengliang Guan 已提交
899 900 901
  pPool->min = minQueryThreads;
  pPool->max = maxQueryThreads;
  if (tWorkerInit(pPool) != 0) return -1;
S
Shengliang Guan 已提交
902 903 904

  pPool = &pMgmt->fetchPool;
  pPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
905 906 907 908 909 910 911 912
  pPool->min = minFetchThreads;
  pPool->max = maxFetchThreads;
  if (tWorkerInit(pPool) != 0) return -1;

  SMWorkerPool *pMPool = &pMgmt->writePool;
  pMPool->name = "vnode-write";
  pMPool->max = maxWriteThreads;
  if (tMWorkerInit(pMPool) != 0) return -1;
S
Shengliang Guan 已提交
913

S
Shengliang Guan 已提交
914 915 916 917 918 919
  pMPool = &pMgmt->syncPool;
  pMPool->name = "vnode-sync";
  pMPool->max = maxSyncThreads;
  if (tMWorkerInit(pMPool) != 0) return -1;

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
920 921 922
  return 0;
}

S
Shengliang Guan 已提交
923
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
924 925 926
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  tWorkerCleanup(&pMgmt->fetchPool);
  tWorkerCleanup(&pMgmt->queryPool);
S
Shengliang Guan 已提交
927 928 929
  tMWorkerCleanup(&pMgmt->writePool);
  tMWorkerCleanup(&pMgmt->syncPool);
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
930 931
}

S
Shengliang Guan 已提交
932
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
933 934
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
935
  pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
S
Shengliang Guan 已提交
936
  pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
937
  pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
938 939 940 941 942
  pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
  pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
943 944 945 946 947 948 949
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
950
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
951
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
952 953 954 955
  tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
  tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
956
  tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
957 958
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
959
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
960 961
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
962 963 964 965 966
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
967 968 969
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
985
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
986 987 988 989 990 991 992 993 994 995
  dInfo("dnode-vnodes is cleaned up");
}

void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  taosRLockLatch(&pMgmt->latch);
  pLoads->num = taosHashGetSize(pMgmt->hash);

  int32_t v = 0;
H
more  
Hongze Cheng 已提交
996
  void *  pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
997 998 999 1000
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

H
more  
Hongze Cheng 已提交
1001
    SVnodeObj * pVnode = *ppVnode;
S
Shengliang Guan 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
    SVnodeLoad *pLoad = &pLoads->data[v++];

    vnodeGetLoad(pVnode->pImpl, pLoad);
    pLoad->vgId = htonl(pLoad->vgId);
    pLoad->totalStorage = htobe64(pLoad->totalStorage);
    pLoad->compStorage = htobe64(pLoad->compStorage);
    pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
    pLoad->tablesNum = htobe64(pLoad->tablesNum);

    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
}