dndVnodes.c 31.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"

20 21 22 23 24
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
25
  char     db[TSDB_DB_FNAME_LEN];
26 27 28
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
29
typedef struct {
30 31 32 33 34 35
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
H
more  
Hongze Cheng 已提交
36 37 38
  char *      db;
  char *      path;
  SVnode *    pImpl;
39 40 41 42
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
43
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
44 45 46
} SVnodeObj;

typedef struct {
47 48 49 50
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
51
  pthread_t    thread;
H
more  
Hongze Cheng 已提交
52
  SDnode *     pDnode;
53
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
54 55
} SVnodeThread;

S
Shengliang Guan 已提交
56 57
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
58

59 60
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
61 62 63
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
64 65 66 67
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
68
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
69

H
more  
Hongze Cheng 已提交
70
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
71
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
72 73
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
74
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
75
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
76 77 78 79 80 81 82
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
83
  SVnodeObj *  pVnode = NULL;
S
Shengliang Guan 已提交
84 85 86 87 88 89 90 91 92 93 94
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

95 96 97 98
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
99 100 101 102
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
103 104
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
105 106
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
107
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
108
  taosRUnLockLatch(&pMgmt->latch);
109
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
110 111
}

112
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
113
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
114
  SVnodeObj *  pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
115 116 117 118 119
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

120
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
121
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
122 123 124
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
125 126 127 128
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
129

130
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
131 132 133 134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
135 136
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
137 138 139 140
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
141
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
142 143 144 145 146 147 148 149
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

150
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
164
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
165 166 167
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

168 169
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

S
Shengliang Guan 已提交
170 171 172 173 174
  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
    vnodeDestroy(pVnode->path);
  }

175 176 177
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
H
more  
Hongze Cheng 已提交
191
    SVnodeObj * pVnode = *ppVnode;
192 193 194 195
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
196
      num++;
197 198 199
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
200 201 202 203 204 205 206 207 208
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

209 210 211 212
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
H
more  
Hongze Cheng 已提交
213 214 215
  char *       content = calloc(1, maxLen + 1);
  cJSON *      root = NULL;
  FILE *       fp = NULL;
216 217
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
218 219 220 221

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  fp = fopen(file, "r");
S
Shengliang Guan 已提交
222
  if (fp == NULL) {
S
Shengliang Guan 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
248 249 250 251
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
252 253 254
      goto PRASE_VNODE_OVER;
    }

255
    for (int32_t i = 0; i < vnodesNum; ++i) {
S
Shengliang Guan 已提交
256
      cJSON *      vnode = cJSON_GetArrayItem(vnodes, i);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
294
    }
295

296
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
297 298
  }

299
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  FILE *fp = fopen(file, "w");
318
  if (fp == NULL) {
S
Shengliang Guan 已提交
319 320 321 322 323 324 325
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

326 327
  int32_t len = 0;
  int32_t maxLen = 65536;
H
more  
Hongze Cheng 已提交
328
  char *  content = calloc(1, maxLen + 1);
329

S
Shengliang Guan 已提交
330
  len += snprintf(content + len, maxLen - len, "{\n");
331
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
332 333
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
334 335 336 337 338 339
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
340
    if (i < numOfVnodes - 1) {
341
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
342
    } else {
343
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
344 345
    }
  }
346
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
364
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
365 366 367 368 369
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
H
more  
Hongze Cheng 已提交
370 371
  SDnode *      pDnode = pThread->pDnode;
  SVnodesMgmt * pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
372 373 374 375 376

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
377
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
378 379

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
380
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
381 382 383
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

S
Shengliang Guan 已提交
384
    SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
S
Shengliang Guan 已提交
385
    SVnode *  pImpl = vnodeOpen(pCfg->path, &cfg);
S
Shengliang Guan 已提交
386
    if (pImpl == NULL) {
387
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
388 389
      pThread->failed++;
    } else {
390
      dndOpenVnode(pDnode, pCfg, pImpl);
391
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
410
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
411 412 413
    return -1;
  }

414 415 416
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
417 418 419 420 421 422
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
423
  int32_t threadNum = pDnode->env.numOfCores;
S
Shengliang Guan 已提交
424
#if 1
S
Shengliang Guan 已提交
425
  threadNum = 1;
S
Shengliang Guan 已提交
426
#endif
S
Shengliang Guan 已提交
427

S
Shengliang Guan 已提交
428 429
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

S
Shengliang Guan 已提交
430 431 432
  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
433
    threads[t].pDnode = pDnode;
434
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
435 436 437 438 439
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
440
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
441 442 443 444 445 446 447 448
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

449 450 451 452
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
453 454
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
455 456

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
457 458 459 460
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
461 462 463
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
464
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
465 466
  }
  free(threads);
467
  free(pCfgs);
S
Shengliang Guan 已提交
468 469 470 471 472 473 474 475 476 477 478

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
479
  dInfo("start to close all vnodes");
S
Shengliang Guan 已提交
480 481 482 483 484 485
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
486
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
487 488 489 490 491 492 493 494 495 496 497 498 499 500
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
501 502
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = pReq->pCont;
503 504 505
  pCreate->vgId = htonl(pCreate->vgId);
  pCreate->dnodeId = htonl(pCreate->dnodeId);
  pCreate->dbUid = htobe64(pCreate->dbUid);
506
  pCreate->vgVersion = htonl(pCreate->vgVersion);
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
  pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
  pCreate->totalBlocks = htonl(pCreate->totalBlocks);
  pCreate->daysPerFile = htonl(pCreate->daysPerFile);
  pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
  pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
  pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
  pCreate->minRows = htonl(pCreate->minRows);
  pCreate->maxRows = htonl(pCreate->maxRows);
  pCreate->commitTime = htonl(pCreate->commitTime);
  pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
  for (int r = 0; r < pCreate->replica; ++r) {
    SReplica *pReplica = &pCreate->replicas[r];
    pReplica->id = htonl(pReplica->id);
    pReplica->port = htons(pReplica->port);
  }

523 524
  return pCreate;
}
S
Shengliang Guan 已提交
525

S
Shengliang Guan 已提交
526
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
527
  pCfg->vgId = pCreate->vgId;
528 529 530 531 532 533 534 535
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
536
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
537 538 539 540 541 542 543 544 545 546 547
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
548 549
}

S
Shengliang Guan 已提交
550
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
551
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
552 553 554 555 556
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
557 558
}

559
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
560
  SDropVnodeReq *pDrop = pReq->pCont;
S
Shengliang Guan 已提交
561 562 563 564
  pDrop->vgId = htonl(pDrop->vgId);
  return pDrop;
}

565
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
566
  SAuthVnodeReq *pAuth = pReq->pCont;
S
Shengliang Guan 已提交
567 568 569 570
  pAuth->vgId = htonl(pAuth->vgId);
  return pAuth;
}

S
Shengliang Guan 已提交
571 572
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
573 574
  dDebug("vgId:%d, create vnode req is received", pCreate->vgId);

S
Shengliang Guan 已提交
575
  SVnodeCfg vnodeCfg = {0};
576
  dndGenerateVnodeCfg(pCreate, &vnodeCfg);
S
Shengliang Guan 已提交
577

578 579
  SWrapperCfg wrapperCfg = {0};
  dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
S
Shengliang Guan 已提交
580

581
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
S
Shengliang Guan 已提交
582
  if (pVnode != NULL) {
583
    dDebug("vgId:%d, already exist", pCreate->vgId);
S
Shengliang Guan 已提交
584
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
585
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
586
    return -1;
S
Shengliang Guan 已提交
587 588
  }

S
Shengliang Guan 已提交
589
  vnodeCfg.pDnode = pDnode;
S
Shengliang Guan 已提交
590
  vnodeCfg.pTfs = pDnode->pTfs;
S
Shengliang Guan 已提交
591
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
592
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
593
    dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
594 595 596
    return -1;
  }

597
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
598
  if (code != 0) {
S
Shengliang Guan 已提交
599
    dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
600 601 602 603 604 605 606 607 608 609 610 611
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
612 613 614 615 616
  }

  return 0;
}

S
Shengliang Guan 已提交
617 618
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
619
  dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
S
Shengliang Guan 已提交
620

621 622
  SVnodeCfg vnodeCfg = {0};
  dndGenerateVnodeCfg(pAlter, &vnodeCfg);
S
Shengliang Guan 已提交
623

624
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
S
Shengliang Guan 已提交
625
  if (pVnode == NULL) {
626
    dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
627
    return -1;
S
Shengliang Guan 已提交
628 629
  }

S
Shengliang Guan 已提交
630
  if (pAlter->vgVersion == pVnode->vgVersion) {
631 632 633 634 635
    dndReleaseVnode(pDnode, pVnode);
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
    return 0;
  }

S
Shengliang Guan 已提交
636
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
637
    dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
638
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
639
    return -1;
S
Shengliang Guan 已提交
640 641
  }

642
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
643
  pVnode->vgVersion = pAlter->vgVersion;
644 645 646 647 648
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
649
  dndReleaseVnode(pDnode, pVnode);
650
  return code;
S
Shengliang Guan 已提交
651 652
}

S
Shengliang Guan 已提交
653
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
654
  SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
655 656 657 658 659 660 661

  int32_t vgId = pDrop->vgId;
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
662 663
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
664 665
  }

666 667 668
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
669 670
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
671 672
  }

673
  dndCloseVnode(pDnode, pVnode);
674 675
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
676 677 678
  return 0;
}

S
Shengliang Guan 已提交
679
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
680
  SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
S
Shengliang Guan 已提交
681 682 683 684 685 686 687

  int32_t vgId = pAuth->vgId;
  dDebug("vgId:%d, auth vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
S
Shengliang Guan 已提交
688
    return -1;
S
Shengliang Guan 已提交
689 690 691 692 693 694 695
  }

  pVnode->accessState = pAuth->accessState;
  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
696
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
697
  SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
698

S
Shengliang Guan 已提交
699 700
  int32_t vgId = pSync->vgId;
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
701 702 703

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
704 705
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
706 707 708
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
709
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
710
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
711
    return -1;
S
Shengliang Guan 已提交
712 713 714 715 716 717
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
718
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
719
  SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
720 721 722 723 724 725 726

  int32_t vgId = pCompact->vgId;
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
727
    return -1;
S
Shengliang Guan 已提交
728 729 730 731 732
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
733
    return -1;
S
Shengliang Guan 已提交
734 735 736 737 738 739
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang 已提交
740
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
741

S
Shengliang 已提交
742
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
743

744
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
745
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
746 747

  for (int32_t i = 0; i < numOfMsgs; ++i) {
748 749 750
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
751
    assert(ptr != NULL);
S
Shengliang Guan 已提交
752 753
  }

H
more  
Hongze Cheng 已提交
754
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
755 756 757 758

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
759
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
760
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
761
      pRsp->ahandle = pMsg->ahandle;
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
778 779
}

780
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
781 782
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
783 784
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
785

S
Shengliang Guan 已提交
786
    // todo
787 788
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
789 790 791
  }
}

792
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
793 794
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
795 796
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
797

S
Shengliang Guan 已提交
798
    // todo
799 800
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
801 802 803
  }
}

S
Shengliang Guan 已提交
804
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
S
Shengliang Guan 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

S
Shengliang Guan 已提交
821
  if (code != TSDB_CODE_SUCCESS && sendRsp) {
S
Shengliang Guan 已提交
822 823 824 825
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
826 827
    rpcFreeCont(pRpcMsg->pCont);
  }
S
Shengliang Guan 已提交
828 829

  return code;
S
Shengliang Guan 已提交
830 831 832
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
833
  SMsgHead *pHead = pMsg->pCont;
834
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
835 836 837 838
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
839
    dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
S
Shengliang Guan 已提交
840 841 842 843
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
844 845 846 847 848 849 850 851 852
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
853
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
S
Shengliang Guan 已提交
854 855 856 857 858 859 860
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
861
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
S
Shengliang Guan 已提交
862 863 864 865 866 867 868
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
869
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
S
Shengliang Guan 已提交
870 871 872 873 874 875 876
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
877
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
S
Shengliang Guan 已提交
878 879 880 881
    dndReleaseVnode(pDnode, pVnode);
  }
}

S
Shengliang Guan 已提交
882 883 884 885 886 887 888
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
  SMsgHead *pHead = pMsg->pCont;
  // pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) return -1;

D
dapan1121 已提交
889
  int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
S
Shengliang Guan 已提交
890 891 892 893
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

894
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
895
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
896
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
897 898 899 900 901 902

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
903
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
904 905 906
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
dengyihao's avatar
dengyihao 已提交
907 908
  int32_t minFetchThreads = TMIN(maxFetchThreads, pDnode->env.numOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(pDnode->env.numOfCores * pDnode->cfg.ratioOfQueryCores), 1);
S
Shengliang Guan 已提交
909
  int32_t maxQueryThreads = minQueryThreads;
dengyihao's avatar
dengyihao 已提交
910 911
  int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1);
  int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1);
S
Shengliang Guan 已提交
912

S
Shengliang Guan 已提交
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

  SFWorkerPool *pFPool = &pMgmt->fetchPool;
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
  if (tFWorkerInit(pFPool) != 0) return -1;

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;
S
Shengliang Guan 已提交
934 935

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
936 937 938
  return 0;
}

S
Shengliang Guan 已提交
939
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
940
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
941
  tFWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
942 943 944
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
945
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
946 947
}

S
Shengliang Guan 已提交
948
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
949 950
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
951 952 953
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
954
  pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
S
Shengliang Guan 已提交
955
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
S
Shengliang Guan 已提交
956 957 958

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
959 960 961 962 963 964 965
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
966
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
967
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
968
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
969
  tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
Shengliang Guan 已提交
970 971 972
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
973 974
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
975
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
976 977
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
978 979 980 981 982
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
983 984 985
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
1001
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
  dInfo("dnode-vnodes is cleaned up");
}

void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  taosRLockLatch(&pMgmt->latch);
  pLoads->num = taosHashGetSize(pMgmt->hash);

  int32_t v = 0;
H
more  
Hongze Cheng 已提交
1012
  void *  pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
1013 1014 1015 1016
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

H
more  
Hongze Cheng 已提交
1017
    SVnodeObj * pVnode = *ppVnode;
S
Shengliang Guan 已提交
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
    SVnodeLoad *pLoad = &pLoads->data[v++];

    vnodeGetLoad(pVnode->pImpl, pLoad);
    pLoad->vgId = htonl(pLoad->vgId);
    pLoad->totalStorage = htobe64(pLoad->totalStorage);
    pLoad->compStorage = htobe64(pLoad->compStorage);
    pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
    pLoad->tablesNum = htobe64(pLoad->tablesNum);

    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
}