dndVnodes.c 30.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
18
#include "dndMgmt.h"
S
Shengliang Guan 已提交
19
#include "dndTransport.h"
S
Shengliang Guan 已提交
20
#include "sync.h"
S
Shengliang Guan 已提交
21

22 23 24 25 26
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
27
  char     db[TSDB_DB_FNAME_LEN];
28 29 30
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
31
typedef struct {
32 33 34 35 36 37
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
S
Shengliang Guan 已提交
38 39 40
  char       *db;
  char       *path;
  SVnode     *pImpl;
41 42 43 44
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
45
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
46 47 48
} SVnodeObj;

typedef struct {
49 50 51 52
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
53
  pthread_t    thread;
S
Shengliang Guan 已提交
54
  SDnode      *pDnode;
55
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
56 57
} SVnodeThread;

S
Shengliang Guan 已提交
58 59
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
60

61 62
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
63 64 65
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
66 67 68 69
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
70
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
71

S
Shengliang Guan 已提交
72
static SVnodeObj  *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
73
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
74 75
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
76
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
77
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
78 79 80 81 82 83 84
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
85
  SVnodeObj   *pVnode = NULL;
S
Shengliang Guan 已提交
86 87 88
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
H
Haojun Liao 已提交
89
  taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
S
Shengliang Guan 已提交
90 91 92 93 94 95 96
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

97 98 99 100
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
101 102 103 104
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
105 106
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
107 108
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
109
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
110
  taosRUnLockLatch(&pMgmt->latch);
111
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
112 113
}

114
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
115
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
116
  SVnodeObj   *pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
117 118 119 120 121
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

122
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
123
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
124 125 126
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
127 128 129 130
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
131

132
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
133 134 135 136
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
137 138
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
139 140 141 142
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
143
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
144 145 146 147 148 149 150 151
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

152
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
166
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
167 168 169
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

170 171
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

S
Shengliang Guan 已提交
172 173 174 175 176
  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
    vnodeDestroy(pVnode->path);
  }

177 178 179
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
S
Shengliang Guan 已提交
193
    SVnodeObj  *pVnode = *ppVnode;
194 195 196 197
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
198
      num++;
199 200 201
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
202 203 204 205 206 207 208 209 210
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

211 212 213 214
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
S
Shengliang Guan 已提交
215 216 217
  char        *content = calloc(1, maxLen + 1);
  cJSON       *root = NULL;
  FILE        *fp = NULL;
218 219
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
220 221 222

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

223 224 225
  // fp = fopen(file, "r");
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
226 227 228 229 230
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

231
  len = (int32_t)taosReadFile(pFile, content, maxLen);
S
Shengliang Guan 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
251 252 253 254
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
255 256 257
      goto PRASE_VNODE_OVER;
    }

258
    for (int32_t i = 0; i < vnodesNum; ++i) {
S
Shengliang Guan 已提交
259
      cJSON       *vnode = cJSON_GetArrayItem(vnodes, i);
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
297
    }
298

299
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
300 301
  }

302
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
303 304 305 306 307 308
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
309
  if (pFile != NULL) taosCloseFile(&pFile);
S
Shengliang Guan 已提交
310 311 312 313 314 315 316 317 318 319

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

320 321 322
  // FILE *fp = fopen(file, "w");
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
323 324 325 326 327 328 329
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

330 331
  int32_t len = 0;
  int32_t maxLen = 65536;
S
Shengliang Guan 已提交
332
  char   *content = calloc(1, maxLen + 1);
333

S
Shengliang Guan 已提交
334
  len += snprintf(content + len, maxLen - len, "{\n");
335
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
336 337
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
338 339 340 341 342 343
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
344
    if (i < numOfVnodes - 1) {
345
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
346
    } else {
347
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
348 349
    }
  }
350
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
351 352
  len += snprintf(content + len, maxLen - len, "}\n");

353 354 355
  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
356 357 358 359 360 361 362 363 364 365 366 367
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
368
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
369 370 371 372 373
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
S
Shengliang Guan 已提交
374 375
  SDnode       *pDnode = pThread->pDnode;
  SVnodesMgmt  *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
376 377 378 379 380

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
381
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
382 383

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
384
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
385 386 387
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

D
dapan1121 已提交
388
    SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
S
Shengliang Guan 已提交
389
    SVnode   *pImpl = vnodeOpen(pCfg->path, &cfg);
S
Shengliang Guan 已提交
390
    if (pImpl == NULL) {
391
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
392 393
      pThread->failed++;
    } else {
394
      dndOpenVnode(pDnode, pCfg, pImpl);
395
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
414
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
415 416 417
    return -1;
  }

418 419 420
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
421 422 423 424 425 426
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

S
config  
Shengliang Guan 已提交
427
  int32_t threadNum = tsNumOfCores;
S
Shengliang Guan 已提交
428
#if 1
S
Shengliang Guan 已提交
429
  threadNum = 1;
S
Shengliang Guan 已提交
430
#endif
S
Shengliang Guan 已提交
431

S
Shengliang Guan 已提交
432 433
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

S
Shengliang Guan 已提交
434 435 436
  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
437
    threads[t].pDnode = pDnode;
438
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
439 440 441 442 443
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
444
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
445 446 447 448 449 450 451 452
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

453 454 455 456
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
457 458
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
459 460

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
461 462 463 464
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
465 466 467
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
468
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
469 470
  }
  free(threads);
471
  free(pCfgs);
S
Shengliang Guan 已提交
472 473 474 475 476 477 478 479 480 481 482

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
483
  dInfo("start to close all vnodes");
S
Shengliang Guan 已提交
484 485 486 487 488 489
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
490
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
505
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
506
  pCfg->vgId = pCreate->vgId;
507 508 509 510 511 512
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
L
Liu Jicong 已提交
513
  pCfg->streamMode = pCreate->streamMode;
514
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
515
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
516 517 518 519 520 521 522 523 524 525 526
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
D
dapan1121 已提交
527 528 529
  pCfg->hashBegin = pCreate->hashBegin;
  pCfg->hashEnd = pCreate->hashEnd;
  pCfg->hashMethod = pCreate->hashMethod;
530 531
}

S
Shengliang Guan 已提交
532
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
533
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
534 535 536 537 538
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
539 540
}

S
Shengliang Guan 已提交
541
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
542 543 544 545 546 547 548
  SCreateVnodeReq createReq = {0};
  if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  dDebug("vgId:%d, create vnode req is received", createReq.vgId);
549

S
Shengliang Guan 已提交
550
  SVnodeCfg vnodeCfg = {0};
S
Shengliang Guan 已提交
551
  dndGenerateVnodeCfg(&createReq, &vnodeCfg);
S
Shengliang Guan 已提交
552

553
  SWrapperCfg wrapperCfg = {0};
S
Shengliang Guan 已提交
554
  dndGenerateWrapperCfg(pDnode, &createReq, &wrapperCfg);
S
Shengliang Guan 已提交
555

S
Shengliang Guan 已提交
556
  if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
557
    terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
558
    dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
559 560 561
    return -1;
  }

S
Shengliang Guan 已提交
562
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, createReq.vgId);
S
Shengliang Guan 已提交
563
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
564
    dDebug("vgId:%d, already exist", createReq.vgId);
S
Shengliang Guan 已提交
565
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
566
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
567
    return -1;
S
Shengliang Guan 已提交
568 569
  }

S
Shengliang Guan 已提交
570
  vnodeCfg.pDnode = pDnode;
S
Shengliang Guan 已提交
571
  vnodeCfg.pTfs = pDnode->pTfs;
D
dapan1121 已提交
572
  vnodeCfg.dbId = wrapperCfg.dbUid;
S
Shengliang Guan 已提交
573
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
574
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
575
    dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
576 577 578
    return -1;
  }

579
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
580
  if (code != 0) {
S
Shengliang Guan 已提交
581
    dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
582 583 584 585 586 587 588 589 590 591 592 593
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
594 595 596 597 598
  }

  return 0;
}

S
Shengliang Guan 已提交
599
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
600 601 602 603 604 605 606
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  dDebug("vgId:%d, alter vnode req is received", alterReq.vgId);
S
Shengliang Guan 已提交
607

608
  SVnodeCfg vnodeCfg = {0};
S
Shengliang Guan 已提交
609
  dndGenerateVnodeCfg(&alterReq, &vnodeCfg);
S
Shengliang Guan 已提交
610

S
Shengliang Guan 已提交
611
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, alterReq.vgId);
S
Shengliang Guan 已提交
612
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
613
    dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
S
Shengliang Guan 已提交
614
    return -1;
S
Shengliang Guan 已提交
615 616
  }

S
Shengliang Guan 已提交
617
  if (alterReq.vgVersion == pVnode->vgVersion) {
618
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
619
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId);
620 621 622
    return 0;
  }

S
Shengliang Guan 已提交
623
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
S
Shengliang Guan 已提交
624
    dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
S
Shengliang Guan 已提交
625
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
626
    return -1;
S
Shengliang Guan 已提交
627 628
  }

629
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
630
  pVnode->vgVersion = alterReq.vgVersion;
631 632 633 634 635
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
636
  dndReleaseVnode(pDnode, pVnode);
637
  return code;
S
Shengliang Guan 已提交
638 639
}

S
Shengliang Guan 已提交
640
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
641
  SDropVnodeReq dropReq = {0};
S
Shengliang Guan 已提交
642 643 644 645
  if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
646

S
Shengliang Guan 已提交
647
  int32_t vgId = dropReq.vgId;
S
Shengliang Guan 已提交
648 649 650 651 652
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
653 654
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
655 656
  }

657 658 659
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
660 661
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
662 663
  }

664
  dndCloseVnode(pDnode, pVnode);
665 666
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
667 668 669
  return 0;
}

S
Shengliang Guan 已提交
670
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
671 672
  SSyncVnodeReq syncReq = {0};
  tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
S
Shengliang Guan 已提交
673

S
Shengliang Guan 已提交
674
  int32_t vgId = syncReq.vgId;
S
Shengliang Guan 已提交
675
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
676 677 678

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
679 680
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
681 682 683
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
684
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
685
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
686
    return -1;
S
Shengliang Guan 已提交
687 688 689 690 691 692
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
693
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
694 695
  SCompactVnodeReq compatcReq = {0};
  tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
S
Shengliang Guan 已提交
696

S
Shengliang Guan 已提交
697
  int32_t vgId = compatcReq.vgId;
S
Shengliang Guan 已提交
698 699 700 701 702
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
703
    return -1;
S
Shengliang Guan 已提交
704 705 706 707 708
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
709
    return -1;
S
Shengliang Guan 已提交
710 711 712 713 714 715
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang 已提交
716
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
717

S
Shengliang 已提交
718
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
719

720
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
721
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
722 723

  for (int32_t i = 0; i < numOfMsgs; ++i) {
724 725 726
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
727
    assert(ptr != NULL);
S
Shengliang Guan 已提交
728 729
  }

H
more  
Hongze Cheng 已提交
730
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
731 732 733 734

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
735
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
736
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
737
      pRsp->ahandle = pMsg->ahandle;
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
754 755
}

756
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
757 758
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
759 760
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
761

S
Shengliang Guan 已提交
762
    // todo
763 764
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
765 766 767
  }
}

768
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
769 770
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
771 772
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
773

S
Shengliang Guan 已提交
774
    // todo
775 776
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
777 778 779
  }
}

S
Shengliang Guan 已提交
780
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
S
Shengliang Guan 已提交
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

S
Shengliang Guan 已提交
797
  if (code != TSDB_CODE_SUCCESS && sendRsp) {
S
Shengliang Guan 已提交
798 799 800 801
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
802 803
    rpcFreeCont(pRpcMsg->pCont);
  }
S
Shengliang Guan 已提交
804 805

  return code;
S
Shengliang Guan 已提交
806 807 808
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
809
  SMsgHead *pHead = pMsg->pCont;
810
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
811 812 813 814
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
815
    dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
S
Shengliang Guan 已提交
816 817 818 819
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
820 821 822 823 824 825 826 827 828
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
829
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
S
Shengliang Guan 已提交
830 831 832 833 834 835 836
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
837
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
S
Shengliang Guan 已提交
838 839 840 841 842 843 844
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
845
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
S
Shengliang Guan 已提交
846 847 848 849 850 851 852
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
853
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
S
Shengliang Guan 已提交
854 855 856 857
    dndReleaseVnode(pDnode, pVnode);
  }
}

S
Shengliang Guan 已提交
858 859 860 861 862 863 864
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
  SMsgHead *pHead = pMsg->pCont;
  // pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) return -1;

D
dapan1121 已提交
865
  int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
S
Shengliang Guan 已提交
866 867 868 869
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

870
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
871
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
872
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
873 874 875 876 877 878

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
879
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
880 881 882
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
S
config  
Shengliang Guan 已提交
883 884
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
S
Shengliang Guan 已提交
885
  int32_t maxQueryThreads = minQueryThreads;
S
config  
Shengliang Guan 已提交
886 887
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);
S
Shengliang Guan 已提交
888

S
Shengliang Guan 已提交
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

  SFWorkerPool *pFPool = &pMgmt->fetchPool;
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
  if (tFWorkerInit(pFPool) != 0) return -1;

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;
S
Shengliang Guan 已提交
910 911

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
912 913 914
  return 0;
}

S
Shengliang Guan 已提交
915
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
916
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
917
  tFWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
918 919 920
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
921
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
922 923
}

S
Shengliang Guan 已提交
924
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
925 926
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
927 928 929
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
930
  pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
S
Shengliang Guan 已提交
931
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
S
Shengliang Guan 已提交
932 933 934

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
935 936 937 938 939 940 941
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
942
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
943
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
944
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
945
  tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
Shengliang Guan 已提交
946 947 948
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
949 950
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
951
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
952 953
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
954 955 956 957 958
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
959 960 961
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
977
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
978 979 980
  dInfo("dnode-vnodes is cleaned up");
}

S
Shengliang Guan 已提交
981
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
S
Shengliang Guan 已提交
982
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
983 984
  int32_t      totalVnodes = 0;
  int32_t      masterNum = 0;
S
Shengliang Guan 已提交
985 986 987 988

  taosRLockLatch(&pMgmt->latch);

  int32_t v = 0;
S
Shengliang Guan 已提交
989
  void   *pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
990 991 992 993
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

S
Shengliang Guan 已提交
994 995 996 997
    SVnodeObj *pVnode = *ppVnode;
    SVnodeLoad vload = {0};
    vnodeGetLoad(pVnode->pImpl, &vload);
    taosArrayPush(pLoads, &vload);
S
Shengliang Guan 已提交
998

S
Shengliang Guan 已提交
999 1000
    totalVnodes++;
    if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++;
S
Shengliang Guan 已提交
1001 1002 1003 1004
    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
1005 1006
  pMgmt->totalVnodes = totalVnodes;
  pMgmt->masterNum = masterNum;
S
Shengliang Guan 已提交
1007
}