dndVnodes.c 32.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"
19
#include "dndMgmt.h"
S
Shengliang Guan 已提交
20

21 22 23 24 25
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
26
  char     db[TSDB_DB_FNAME_LEN];
27 28 29
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
30
typedef struct {
31 32 33 34 35 36
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
H
more  
Hongze Cheng 已提交
37 38 39
  char *      db;
  char *      path;
  SVnode *    pImpl;
40 41 42 43
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
44
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
45 46 47
} SVnodeObj;

typedef struct {
48 49 50 51
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
52
  pthread_t    thread;
H
more  
Hongze Cheng 已提交
53
  SDnode *     pDnode;
54
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
55 56
} SVnodeThread;

S
Shengliang Guan 已提交
57 58
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
59

60 61
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
62 63 64
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
65 66 67 68
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
69
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
70

H
more  
Hongze Cheng 已提交
71
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
72
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
73 74
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
75
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
76
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
77 78 79 80 81 82 83
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
84
  SVnodeObj *  pVnode = NULL;
S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

96 97 98 99
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
100 101 102 103
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
104 105
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
106 107
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
108
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
109
  taosRUnLockLatch(&pMgmt->latch);
110
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
111 112
}

113
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
114
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
H
more  
Hongze Cheng 已提交
115
  SVnodeObj *  pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
116 117 118 119 120
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

121
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
122
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
123 124 125
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
126 127 128 129
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
130

131
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
132 133 134 135
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
136 137
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
138 139 140 141
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
142
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
143 144 145 146 147 148 149 150
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

151
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
165
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
166 167 168
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

169 170
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

S
Shengliang Guan 已提交
171 172 173 174 175
  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
    vnodeDestroy(pVnode->path);
  }

176 177 178
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
H
more  
Hongze Cheng 已提交
192
    SVnodeObj * pVnode = *ppVnode;
193 194 195 196
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
197
      num++;
198 199 200
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
201 202 203 204 205 206 207 208 209
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

210 211 212 213
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
H
more  
Hongze Cheng 已提交
214 215 216
  char *       content = calloc(1, maxLen + 1);
  cJSON *      root = NULL;
  FILE *       fp = NULL;
217 218
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
219 220 221 222

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  fp = fopen(file, "r");
S
Shengliang Guan 已提交
223
  if (fp == NULL) {
S
Shengliang Guan 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
249 250 251 252
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
253 254 255
      goto PRASE_VNODE_OVER;
    }

256
    for (int32_t i = 0; i < vnodesNum; ++i) {
S
Shengliang Guan 已提交
257
      cJSON *      vnode = cJSON_GetArrayItem(vnodes, i);
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
295
    }
296

297
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
298 299
  }

300
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

  FILE *fp = fopen(file, "w");
319
  if (fp == NULL) {
S
Shengliang Guan 已提交
320 321 322 323 324 325 326
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

327 328
  int32_t len = 0;
  int32_t maxLen = 65536;
H
more  
Hongze Cheng 已提交
329
  char *  content = calloc(1, maxLen + 1);
330

S
Shengliang Guan 已提交
331
  len += snprintf(content + len, maxLen - len, "{\n");
332
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
333 334
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
335 336 337 338 339 340
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
341
    if (i < numOfVnodes - 1) {
342
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
343
    } else {
344
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
345 346
    }
  }
347
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
365
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
366 367 368 369 370
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
H
more  
Hongze Cheng 已提交
371 372
  SDnode *      pDnode = pThread->pDnode;
  SVnodesMgmt * pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
373 374 375 376 377

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
378
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
379 380

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
381
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
382 383 384
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

D
dapan1121 已提交
385
    SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
S
Shengliang Guan 已提交
386
    SVnode *  pImpl = vnodeOpen(pCfg->path, &cfg);
S
Shengliang Guan 已提交
387
    if (pImpl == NULL) {
388
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
389 390
      pThread->failed++;
    } else {
391
      dndOpenVnode(pDnode, pCfg, pImpl);
392
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
411
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
412 413 414
    return -1;
  }

415 416 417
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
418 419 420 421 422 423
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

S
Shengliang Guan 已提交
424
  int32_t threadNum = pDnode->env.numOfCores;
S
Shengliang Guan 已提交
425
#if 1
S
Shengliang Guan 已提交
426
  threadNum = 1;
S
Shengliang Guan 已提交
427
#endif
S
Shengliang Guan 已提交
428

S
Shengliang Guan 已提交
429 430
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

S
Shengliang Guan 已提交
431 432 433
  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
434
    threads[t].pDnode = pDnode;
435
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
436 437 438 439 440
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
441
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
442 443 444 445 446 447 448 449
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

450 451 452 453
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
454 455
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
456 457

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
458 459 460 461
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
462 463 464
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
465
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
466 467
  }
  free(threads);
468
  free(pCfgs);
S
Shengliang Guan 已提交
469 470 471 472 473 474 475 476 477 478 479

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
480
  dInfo("start to close all vnodes");
S
Shengliang Guan 已提交
481 482 483 484 485 486
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
487
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
502 503
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = pReq->pCont;
504 505 506
  pCreate->vgId = htonl(pCreate->vgId);
  pCreate->dnodeId = htonl(pCreate->dnodeId);
  pCreate->dbUid = htobe64(pCreate->dbUid);
507
  pCreate->vgVersion = htonl(pCreate->vgVersion);
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
  pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
  pCreate->totalBlocks = htonl(pCreate->totalBlocks);
  pCreate->daysPerFile = htonl(pCreate->daysPerFile);
  pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
  pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
  pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
  pCreate->minRows = htonl(pCreate->minRows);
  pCreate->maxRows = htonl(pCreate->maxRows);
  pCreate->commitTime = htonl(pCreate->commitTime);
  pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
  for (int r = 0; r < pCreate->replica; ++r) {
    SReplica *pReplica = &pCreate->replicas[r];
    pReplica->id = htonl(pReplica->id);
    pReplica->port = htons(pReplica->port);
  }

524 525
  return pCreate;
}
S
Shengliang Guan 已提交
526

S
Shengliang Guan 已提交
527
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
528
  pCfg->vgId = pCreate->vgId;
529 530 531 532 533 534 535
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
536
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
537 538 539 540 541 542 543 544 545 546 547
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
548 549
}

S
Shengliang Guan 已提交
550
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
551
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
552 553 554 555 556
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
557 558
}

559
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
560
  SDropVnodeReq *pDrop = pReq->pCont;
S
Shengliang Guan 已提交
561 562 563 564
  pDrop->vgId = htonl(pDrop->vgId);
  return pDrop;
}

565
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
S
Shengliang Guan 已提交
566
  SAuthVnodeReq *pAuth = pReq->pCont;
S
Shengliang Guan 已提交
567 568 569 570
  pAuth->vgId = htonl(pAuth->vgId);
  return pAuth;
}

S
Shengliang Guan 已提交
571 572
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
573 574
  dDebug("vgId:%d, create vnode req is received", pCreate->vgId);

S
Shengliang Guan 已提交
575
  SVnodeCfg vnodeCfg = {0};
576
  dndGenerateVnodeCfg(pCreate, &vnodeCfg);
S
Shengliang Guan 已提交
577

578 579
  SWrapperCfg wrapperCfg = {0};
  dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
S
Shengliang Guan 已提交
580

581 582 583 584 585 586
  if (pCreate->dnodeId != dndGetDnodeId(pDnode)) {
    terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
    dDebug("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
    return -1;
  }

587
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
S
Shengliang Guan 已提交
588
  if (pVnode != NULL) {
589
    dDebug("vgId:%d, already exist", pCreate->vgId);
S
Shengliang Guan 已提交
590
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
591
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
592
    return -1;
S
Shengliang Guan 已提交
593 594
  }

S
Shengliang Guan 已提交
595
  vnodeCfg.pDnode = pDnode;
S
Shengliang Guan 已提交
596
  vnodeCfg.pTfs = pDnode->pTfs;
D
dapan1121 已提交
597
  vnodeCfg.dbId = wrapperCfg.dbUid;
S
Shengliang Guan 已提交
598
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
599
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
600
    dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
601 602 603
    return -1;
  }

604
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
605
  if (code != 0) {
S
Shengliang Guan 已提交
606
    dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
607 608 609 610 611 612 613 614 615 616 617 618
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
619 620 621 622 623
  }

  return 0;
}

S
Shengliang Guan 已提交
624 625
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
626
  dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
S
Shengliang Guan 已提交
627

628 629
  SVnodeCfg vnodeCfg = {0};
  dndGenerateVnodeCfg(pAlter, &vnodeCfg);
S
Shengliang Guan 已提交
630

631
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
S
Shengliang Guan 已提交
632
  if (pVnode == NULL) {
633
    dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
634
    return -1;
S
Shengliang Guan 已提交
635 636
  }

S
Shengliang Guan 已提交
637
  if (pAlter->vgVersion == pVnode->vgVersion) {
638 639 640 641 642
    dndReleaseVnode(pDnode, pVnode);
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
    return 0;
  }

S
Shengliang Guan 已提交
643
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
644
    dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
S
Shengliang Guan 已提交
645
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
646
    return -1;
S
Shengliang Guan 已提交
647 648
  }

649
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
650
  pVnode->vgVersion = pAlter->vgVersion;
651 652 653 654 655
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
656
  dndReleaseVnode(pDnode, pVnode);
657
  return code;
S
Shengliang Guan 已提交
658 659
}

S
Shengliang Guan 已提交
660
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
661
  SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
662 663 664 665 666 667 668

  int32_t vgId = pDrop->vgId;
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
669 670
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
671 672
  }

673 674 675
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
676 677
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
678 679
  }

680
  dndCloseVnode(pDnode, pVnode);
681 682
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
683 684 685
  return 0;
}

S
Shengliang Guan 已提交
686
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
687
  SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
S
Shengliang Guan 已提交
688 689 690 691 692 693 694

  int32_t vgId = pAuth->vgId;
  dDebug("vgId:%d, auth vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
S
Shengliang Guan 已提交
695
    return -1;
S
Shengliang Guan 已提交
696 697 698 699 700 701 702
  }

  pVnode->accessState = pAuth->accessState;
  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
703
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
704
  SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
705

S
Shengliang Guan 已提交
706 707
  int32_t vgId = pSync->vgId;
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
708 709 710

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
711 712
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
713 714 715
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
716
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
717
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
718
    return -1;
S
Shengliang Guan 已提交
719 720 721 722 723 724
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
725
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
726
  SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
S
Shengliang Guan 已提交
727 728 729 730 731 732 733

  int32_t vgId = pCompact->vgId;
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
734
    return -1;
S
Shengliang Guan 已提交
735 736 737 738 739
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
740
    return -1;
S
Shengliang Guan 已提交
741 742 743 744 745 746
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang 已提交
747
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
748

S
Shengliang 已提交
749
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
750

751
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
752
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
753 754

  for (int32_t i = 0; i < numOfMsgs; ++i) {
755 756 757
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
758
    assert(ptr != NULL);
S
Shengliang Guan 已提交
759 760
  }

H
more  
Hongze Cheng 已提交
761
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
762 763 764 765

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
766
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
767
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
768
      pRsp->ahandle = pMsg->ahandle;
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
785 786
}

787
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
788 789
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
790 791
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
792

S
Shengliang Guan 已提交
793
    // todo
794 795
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
796 797 798
  }
}

799
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
800 801
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
802 803
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
804

S
Shengliang Guan 已提交
805
    // todo
806 807
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
808 809 810
  }
}

S
Shengliang Guan 已提交
811
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
S
Shengliang Guan 已提交
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

S
Shengliang Guan 已提交
828
  if (code != TSDB_CODE_SUCCESS && sendRsp) {
S
Shengliang Guan 已提交
829 830 831 832
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
833 834
    rpcFreeCont(pRpcMsg->pCont);
  }
S
Shengliang Guan 已提交
835 836

  return code;
S
Shengliang Guan 已提交
837 838 839
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
840
  SMsgHead *pHead = pMsg->pCont;
841
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
842 843 844 845
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
846
    dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
S
Shengliang Guan 已提交
847 848 849 850
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
851 852 853 854 855 856 857 858 859
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
860
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
S
Shengliang Guan 已提交
861 862 863 864 865 866 867
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
868
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
S
Shengliang Guan 已提交
869 870 871 872 873 874 875
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
876
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
S
Shengliang Guan 已提交
877 878 879 880 881 882 883
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
884
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
S
Shengliang Guan 已提交
885 886 887 888
    dndReleaseVnode(pDnode, pVnode);
  }
}

S
Shengliang Guan 已提交
889 890 891 892 893 894 895
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
  SMsgHead *pHead = pMsg->pCont;
  // pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) return -1;

D
dapan1121 已提交
896
  int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
S
Shengliang Guan 已提交
897 898 899 900
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

901
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
902
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
903
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
904 905 906 907 908 909

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
910
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
911 912 913
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
dengyihao's avatar
dengyihao 已提交
914 915
  int32_t minFetchThreads = TMIN(maxFetchThreads, pDnode->env.numOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(pDnode->env.numOfCores * pDnode->cfg.ratioOfQueryCores), 1);
S
Shengliang Guan 已提交
916
  int32_t maxQueryThreads = minQueryThreads;
dengyihao's avatar
dengyihao 已提交
917 918
  int32_t maxWriteThreads = TMAX(pDnode->env.numOfCores, 1);
  int32_t maxSyncThreads = TMAX(pDnode->env.numOfCores / 2, 1);
S
Shengliang Guan 已提交
919

S
Shengliang Guan 已提交
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

  SFWorkerPool *pFPool = &pMgmt->fetchPool;
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
  if (tFWorkerInit(pFPool) != 0) return -1;

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;
S
Shengliang Guan 已提交
941 942

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
943 944 945
  return 0;
}

S
Shengliang Guan 已提交
946
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
947
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
948
  tFWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
949 950 951
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
952
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
953 954
}

S
Shengliang Guan 已提交
955
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
956 957
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
958 959 960
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
961
  pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
S
Shengliang Guan 已提交
962
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
S
Shengliang Guan 已提交
963 964 965

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
966 967 968 969 970 971 972
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
973
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
974
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
975
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
976
  tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
Shengliang Guan 已提交
977 978 979
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
980 981
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
982
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
983 984
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
985 986 987 988 989
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
990 991 992
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
1008
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
  dInfo("dnode-vnodes is cleaned up");
}

void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  taosRLockLatch(&pMgmt->latch);
  pLoads->num = taosHashGetSize(pMgmt->hash);

  int32_t v = 0;
H
more  
Hongze Cheng 已提交
1019
  void *  pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
1020 1021 1022 1023
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

H
more  
Hongze Cheng 已提交
1024
    SVnodeObj * pVnode = *ppVnode;
S
Shengliang Guan 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
    SVnodeLoad *pLoad = &pLoads->data[v++];

    vnodeGetLoad(pVnode->pImpl, pLoad);
    pLoad->vgId = htonl(pLoad->vgId);
    pLoad->totalStorage = htobe64(pLoad->totalStorage);
    pLoad->compStorage = htobe64(pLoad->compStorage);
    pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
    pLoad->tablesNum = htobe64(pLoad->tablesNum);

    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
}