dndVnodes.c 31.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"

20 21 22 23 24
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
25
  char     db[TSDB_DB_FNAME_LEN];
26 27 28
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
29
typedef struct {
30 31 32 33 34 35
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
H
more  
Hongze Cheng 已提交
36 37 38
  char *      db;
  char *      path;
  SVnode *    pImpl;
39 40 41 42
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
43
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
44 45 46
} SVnodeObj;

typedef struct {
47 48 49 50
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
51
  pthread_t    thread;
H
more  
Hongze Cheng 已提交
52
  SDnode *     pDnode;
53
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
54 55
} SVnodeThread;

S
Shengliang Guan 已提交
56 57
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
58

59 60
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
61 62 63
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
64 65 66 67
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
68
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
69

H
more  
Hongze Cheng 已提交
70
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
71
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
72 73
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
74
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
75
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
76 77 78 79 80 81 82
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
83
  SVnodeObj *  pVnode = NULL;
S
Shengliang Guan 已提交
84 85 86 87 88 89 90 91 92 93 94
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

95 96 97 98
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
99 100 101 102
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
103 104
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
105 106
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
107
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
108
  taosRUnLockLatch(&pMgmt->latch);
109
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
110 111
}

112
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
113
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
114
  SVnodeObj *  pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
115 116 117 118 119
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

120
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
121
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
122 123 124
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
125 126 127 128
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
129

130
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
131 132 133 134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
135 136
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
137 138 139 140
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
141
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
142 143 144 145 146 147 148 149
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

150
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
164
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
165 166 167
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

168 169
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

170 171 172
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
H
more  
Hongze Cheng 已提交
186
    SVnodeObj * pVnode = *ppVnode;
187 188 189 190
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
191
      num++;
192 193 194
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
195 196 197 198 199 200 201 202 203
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

204 205 206 207
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
H
more  
Hongze Cheng 已提交
208 209 210
  char *       content = calloc(1, maxLen + 1);
  cJSON *      root = NULL;
  FILE *       fp = NULL;
211 212
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
213 214 215 216

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  fp = fopen(file, "r");
S
Shengliang Guan 已提交
217
  if (fp == NULL) {
S
Shengliang Guan 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
243 244 245 246
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
247 248 249
      goto PRASE_VNODE_OVER;
    }

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    for (int32_t i = 0; i < vnodesNum; ++i) {
      cJSON       *vnode = cJSON_GetArrayItem(vnodes, i);
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
289
    }
290

291
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
292 293
  }

294
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  FILE *fp = fopen(file, "w");
313
  if (fp == NULL) {
S
Shengliang Guan 已提交
314 315 316 317 318 319 320
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

321 322
  int32_t len = 0;
  int32_t maxLen = 65536;
H
more  
Hongze Cheng 已提交
323
  char *  content = calloc(1, maxLen + 1);
324

S
Shengliang Guan 已提交
325
  len += snprintf(content + len, maxLen - len, "{\n");
326
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
327 328
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
329 330 331 332 333 334
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
335
    if (i < numOfVnodes - 1) {
336
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
337
    } else {
338
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
339 340
    }
  }
341
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
359
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
360 361 362 363 364
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
H
more  
Hongze Cheng 已提交
365 366
  SDnode *      pDnode = pThread->pDnode;
  SVnodesMgmt * pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
367 368 369 370 371

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
372
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
373 374

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
375
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
376 377 378
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

H
more  
Hongze Cheng 已提交
379
    SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId);
S
Shengliang Guan 已提交
380
    if (pImpl == NULL) {
381
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
382 383
      pThread->failed++;
    } else {
384
      dndOpenVnode(pDnode, pCfg, pImpl);
385
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
404
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
405 406 407
    return -1;
  }

408 409 410
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
411 412 413 414 415 416
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

417
  int32_t threadNum = pDnode->opt.numOfCores;
S
Shengliang Guan 已提交
418 419 420 421 422
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
423
    threads[t].pDnode = pDnode;
424
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
425 426 427 428 429
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
430
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
431 432 433 434 435 436 437 438
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

439 440 441 442
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
443 444
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
445 446

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
447 448 449 450
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
451 452 453
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
454
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
455 456
  }
  free(threads);
457
  free(pCfgs);
S
Shengliang Guan 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
475
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
490 491
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = pReq->pCont;
492 493 494
  pCreate->vgId = htonl(pCreate->vgId);
  pCreate->dnodeId = htonl(pCreate->dnodeId);
  pCreate->dbUid = htobe64(pCreate->dbUid);
495
  pCreate->vgVersion = htonl(pCreate->vgVersion);
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
  pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
  pCreate->totalBlocks = htonl(pCreate->totalBlocks);
  pCreate->daysPerFile = htonl(pCreate->daysPerFile);
  pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
  pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
  pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
  pCreate->minRows = htonl(pCreate->minRows);
  pCreate->maxRows = htonl(pCreate->maxRows);
  pCreate->commitTime = htonl(pCreate->commitTime);
  pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
  for (int r = 0; r < pCreate->replica; ++r) {
    SReplica *pReplica = &pCreate->replicas[r];
    pReplica->id = htonl(pReplica->id);
    pReplica->port = htons(pReplica->port);
  }

512 513
  return pCreate;
}
S
Shengliang Guan 已提交
514

S
Shengliang Guan 已提交
515
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
516
  pCfg->vgId = pCreate->vgId;
517 518 519 520 521 522 523 524
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
525
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
526 527 528 529 530 531 532 533 534 535 536
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
537 538
}

S
Shengliang Guan 已提交
539
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
540
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
541 542 543 544 545
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
546 547
}

548
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
549
  SDropVnodeReq *pDrop = pReq->pCont;
S
Shengliang Guan 已提交
550 551 552 553
  pDrop->vgId = htonl(pDrop->vgId);
  return pDrop;
}

554
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
555
  SAuthVnodeReq *pAuth = pReq->pCont;
S
Shengliang Guan 已提交
556 557 558 559
  pAuth->vgId = htonl(pAuth->vgId);
  return pAuth;
}

S
Shengliang Guan 已提交
560 561
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
562 563
  dDebug("vgId:%d, create vnode req is received", pCreate->vgId);

S
Shengliang Guan 已提交
564
  SVnodeCfg vnodeCfg = {0};
565
  dndGenerateVnodeCfg(pCreate, &vnodeCfg);
S
Shengliang Guan 已提交
566

567 568
  SWrapperCfg wrapperCfg = {0};
  dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
S
Shengliang Guan 已提交
569

570
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
S
Shengliang Guan 已提交
571
  if (pVnode != NULL) {
572
    dDebug("vgId:%d, already exist", pCreate->vgId);
S
Shengliang Guan 已提交
573
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
574
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
575
    return -1;
S
Shengliang Guan 已提交
576 577
  }

H
more  
Hongze Cheng 已提交
578
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId);
579
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
580
    dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
581 582 583
    return -1;
  }

584
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
585
  if (code != 0) {
S
Shengliang Guan 已提交
586
    dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
587 588 589 590 591 592 593 594 595 596 597 598
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
599 600 601 602 603
  }

  return 0;
}

S
Shengliang Guan 已提交
604 605
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
606
  dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
S
Shengliang Guan 已提交
607

608 609
  SVnodeCfg vnodeCfg = {0};
  dndGenerateVnodeCfg(pAlter, &vnodeCfg);
S
Shengliang Guan 已提交
610

611
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
S
Shengliang Guan 已提交
612
  if (pVnode == NULL) {
613
    dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
614
    return -1;
S
Shengliang Guan 已提交
615 616
  }

S
Shengliang Guan 已提交
617
  if (pAlter->vgVersion == pVnode->vgVersion) {
618 619 620 621 622
    dndReleaseVnode(pDnode, pVnode);
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
    return 0;
  }

S
Shengliang Guan 已提交
623
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
624
    dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
625
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
626
    return -1;
S
Shengliang Guan 已提交
627 628
  }

629
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
630
  pVnode->vgVersion = pAlter->vgVersion;
631 632 633 634 635
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
636
  dndReleaseVnode(pDnode, pVnode);
637
  return code;
S
Shengliang Guan 已提交
638 639
}

S
Shengliang Guan 已提交
640
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
641
  SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
642 643 644 645 646 647 648

  int32_t vgId = pDrop->vgId;
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
649 650
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
651 652
  }

653 654 655
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
656 657
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
658 659
  }

660
  dndCloseVnode(pDnode, pVnode);
661 662 663 664
  vnodeClose(pVnode->pImpl);
  vnodeDestroy(pVnode->path);
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
665 666 667
  return 0;
}

S
Shengliang Guan 已提交
668
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
669
  SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
S
Shengliang Guan 已提交
670 671 672 673 674 675 676

  int32_t vgId = pAuth->vgId;
  dDebug("vgId:%d, auth vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
S
Shengliang Guan 已提交
677
    return -1;
S
Shengliang Guan 已提交
678 679 680 681 682 683 684
  }

  pVnode->accessState = pAuth->accessState;
  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
685
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
686
  SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
687

S
Shengliang Guan 已提交
688 689
  int32_t vgId = pSync->vgId;
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
690 691 692

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
693 694
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
695 696 697
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
698
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
699
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
700
    return -1;
S
Shengliang Guan 已提交
701 702 703 704 705 706
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
707
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
708
  SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
709 710 711 712 713 714 715

  int32_t vgId = pCompact->vgId;
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
716
    return -1;
S
Shengliang Guan 已提交
717 718 719 720 721
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
722
    return -1;
S
Shengliang Guan 已提交
723 724 725 726 727 728
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

729 730 731
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
  SRpcMsg *pRsp = NULL;
  vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
732 733
}

734 735 736
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
  SRpcMsg *pRsp = NULL;
  vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
737 738
}

739
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
740
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
741 742

  for (int32_t i = 0; i < numOfMsgs; ++i) {
743 744 745
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
746
    assert(ptr != NULL);
S
Shengliang Guan 已提交
747 748
  }

H
more  
Hongze Cheng 已提交
749
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
750 751 752 753

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
754
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
755
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
756
      pRsp->ahandle = pMsg->ahandle;
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
773 774
}

775
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
776 777
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
778 779
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
780

S
Shengliang Guan 已提交
781
    // todo
782 783
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
784 785 786
  }
}

787
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
788 789
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
790 791
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
792

S
Shengliang Guan 已提交
793
    // todo
794 795
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
796 797 798
  }
}

799
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) {
S
Shengliang Guan 已提交
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

  if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
817 818 819 820
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
821 822 823 824 825
    rpcFreeCont(pRpcMsg->pCont);
  }
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
826
  SMsgHead *pHead = pMsg->pCont;
827
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
828 829 830 831
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
832 833 834 835
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
    dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg);
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
853
    dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
854 855 856 857 858 859 860
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
861
    dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
862 863 864 865 866 867 868
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
869
    dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
870 871 872 873
    dndReleaseVnode(pDnode, pVnode);
  }
}

874
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
875
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
876
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
877 878 879 880 881 882

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
883
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
884 885 886
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
S
Shengliang Guan 已提交
887 888 889 890 891
  int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores);
  int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1);
  int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1);
S
Shengliang Guan 已提交
892 893 894

  SWorkerPool *pPool = &pMgmt->queryPool;
  pPool->name = "vnode-query";
S
Shengliang Guan 已提交
895 896 897
  pPool->min = minQueryThreads;
  pPool->max = maxQueryThreads;
  if (tWorkerInit(pPool) != 0) return -1;
S
Shengliang Guan 已提交
898 899 900

  pPool = &pMgmt->fetchPool;
  pPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
901 902 903 904 905 906 907 908
  pPool->min = minFetchThreads;
  pPool->max = maxFetchThreads;
  if (tWorkerInit(pPool) != 0) return -1;

  SMWorkerPool *pMPool = &pMgmt->writePool;
  pMPool->name = "vnode-write";
  pMPool->max = maxWriteThreads;
  if (tMWorkerInit(pMPool) != 0) return -1;
S
Shengliang Guan 已提交
909

S
Shengliang Guan 已提交
910 911 912 913 914 915
  pMPool = &pMgmt->syncPool;
  pMPool->name = "vnode-sync";
  pMPool->max = maxSyncThreads;
  if (tMWorkerInit(pMPool) != 0) return -1;

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
916 917 918
  return 0;
}

S
Shengliang Guan 已提交
919
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
920 921 922
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  tWorkerCleanup(&pMgmt->fetchPool);
  tWorkerCleanup(&pMgmt->queryPool);
S
Shengliang Guan 已提交
923 924 925
  tMWorkerCleanup(&pMgmt->writePool);
  tMWorkerCleanup(&pMgmt->syncPool);
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
926 927
}

S
Shengliang Guan 已提交
928
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
929 930
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
931
  pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
S
Shengliang Guan 已提交
932
  pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
933
  pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
934 935 936 937 938
  pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
  pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
939 940 941 942 943 944 945
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
946
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
947
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
948 949 950 951
  tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
  tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
952
  tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
953 954
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
955
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
956 957
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
958 959 960 961 962
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
963 964 965
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
981
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
982 983 984 985 986 987 988 989 990 991
  dInfo("dnode-vnodes is cleaned up");
}

void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  taosRLockLatch(&pMgmt->latch);
  pLoads->num = taosHashGetSize(pMgmt->hash);

  int32_t v = 0;
H
more  
Hongze Cheng 已提交
992
  void *  pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
993 994 995 996
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

H
more  
Hongze Cheng 已提交
997
    SVnodeObj * pVnode = *ppVnode;
S
Shengliang Guan 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
    SVnodeLoad *pLoad = &pLoads->data[v++];

    vnodeGetLoad(pVnode->pImpl, pLoad);
    pLoad->vgId = htonl(pLoad->vgId);
    pLoad->totalStorage = htobe64(pLoad->totalStorage);
    pLoad->compStorage = htobe64(pLoad->compStorage);
    pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
    pLoad->tablesNum = htobe64(pLoad->tablesNum);

    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
}