dndVnodes.c 30.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndVnodes.h"
18
#include "dndMgmt.h"
S
Shengliang Guan 已提交
19
#include "dndTransport.h"
S
Shengliang Guan 已提交
20

21 22 23 24 25
typedef struct {
  int32_t  vgId;
  int32_t  vgVersion;
  int8_t   dropped;
  uint64_t dbUid;
26
  char     db[TSDB_DB_FNAME_LEN];
27 28 29
  char     path[PATH_MAX + 20];
} SWrapperCfg;

S
Shengliang Guan 已提交
30
typedef struct {
31 32 33 34 35 36
  int32_t     vgId;
  int32_t     refCount;
  int32_t     vgVersion;
  int8_t      dropped;
  int8_t      accessState;
  uint64_t    dbUid;
S
Shengliang Guan 已提交
37 38 39
  char       *db;
  char       *path;
  SVnode     *pImpl;
40 41 42 43
  STaosQueue *pWriteQ;
  STaosQueue *pSyncQ;
  STaosQueue *pApplyQ;
  STaosQueue *pQueryQ;
H
more  
Hongze Cheng 已提交
44
  STaosQueue *pFetchQ;
S
Shengliang Guan 已提交
45 46 47
} SVnodeObj;

typedef struct {
48 49 50 51
  int32_t      vnodeNum;
  int32_t      opened;
  int32_t      failed;
  int32_t      threadIndex;
52
  pthread_t    thread;
S
Shengliang Guan 已提交
53
  SDnode      *pDnode;
54
  SWrapperCfg *pCfgs;
S
Shengliang Guan 已提交
55 56
} SVnodeThread;

S
Shengliang Guan 已提交
57 58
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void    dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
59

60 61
static void    dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void    dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
62 63 64
static void    dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
static void    dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs);
S
Shengliang Guan 已提交
65 66 67 68
void           dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void           dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
69
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
S
Shengliang Guan 已提交
70

S
Shengliang Guan 已提交
71
static SVnodeObj  *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
S
Shengliang Guan 已提交
72
static void        dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
73 74
static int32_t     dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void        dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
S
Shengliang Guan 已提交
75
static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes);
76
static int32_t     dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
S
Shengliang Guan 已提交
77 78 79 80 81 82 83
static int32_t     dndWriteVnodesToFile(SDnode *pDnode);

static int32_t dndOpenVnodes(SDnode *pDnode);
static void    dndCloseVnodes(SDnode *pDnode);

static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
84
  SVnodeObj   *pVnode = NULL;
S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95
  int32_t      refCount = 0;

  taosRLockLatch(&pMgmt->latch);
  taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
  if (pVnode == NULL) {
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
  } else {
    refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
  }
  taosRUnLockLatch(&pMgmt->latch);

96 97 98 99
  if (pVnode != NULL) {
    dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
  }

S
Shengliang Guan 已提交
100 101 102 103
  return pVnode;
}

static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
104 105
  if (pVnode == NULL) return;

S
Shengliang Guan 已提交
106 107
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);
108
  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
S
Shengliang Guan 已提交
109
  taosRUnLockLatch(&pMgmt->latch);
110
  dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
S
Shengliang Guan 已提交
111 112
}

113
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
S
Shengliang Guan 已提交
114
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
115
  SVnodeObj   *pVnode = calloc(1, sizeof(SVnodeObj));
S
Shengliang Guan 已提交
116 117 118 119 120
  if (pVnode == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

121
  pVnode->vgId = pCfg->vgId;
S
Shengliang Guan 已提交
122
  pVnode->refCount = 0;
S
Shengliang Guan 已提交
123 124 125
  pVnode->dropped = 0;
  pVnode->accessState = TSDB_VN_ALL_ACCCESS;
  pVnode->pImpl = pImpl;
126 127 128 129
  pVnode->vgVersion = pCfg->vgVersion;
  pVnode->dbUid = pCfg->dbUid;
  pVnode->db = tstrdup(pCfg->db);
  pVnode->path = tstrdup(pCfg->path);
S
Shengliang Guan 已提交
130

131
  if (pVnode->path == NULL || pVnode->db == NULL) {
S
Shengliang Guan 已提交
132 133 134 135
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
136 137
  if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
138 139 140 141
    return -1;
  }

  taosWLockLatch(&pMgmt->latch);
142
  int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
S
Shengliang Guan 已提交
143 144 145 146 147 148 149 150
  taosWUnLockLatch(&pMgmt->latch);

  if (code != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }
  return code;
}

151
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosWLockLatch(&pMgmt->latch);
  taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
  taosWUnLockLatch(&pMgmt->latch);

  dndReleaseVnode(pDnode, pVnode);
  while (pVnode->refCount > 0) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
  while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);

S
Shengliang Guan 已提交
165
  dndFreeVnodeQueue(pDnode, pVnode);
S
Shengliang Guan 已提交
166 167 168
  vnodeClose(pVnode->pImpl);
  pVnode->pImpl = NULL;

169 170
  dDebug("vgId:%d, vnode is closed", pVnode->vgId);

S
Shengliang Guan 已提交
171 172 173 174 175
  if (pVnode->dropped) {
    dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
    vnodeDestroy(pVnode->path);
  }

176 177 178
  free(pVnode->path);
  free(pVnode->db);
  free(pVnode);
S
Shengliang Guan 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191
}

static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosRLockLatch(&pMgmt->latch);

  int32_t     num = 0;
  int32_t     size = taosHashGetSize(pMgmt->hash);
  SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
S
Shengliang Guan 已提交
192
    SVnodeObj  *pVnode = *ppVnode;
193 194 195 196
    if (pVnode && num < size) {
      int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
      dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
      pVnodes[num] = (*ppVnode);
S
Shengliang Guan 已提交
197
      num++;
198 199 200
      pIter = taosHashIterate(pMgmt->hash, pIter);
    } else {
      taosHashCancelIterate(pMgmt->hash, pIter);
S
Shengliang Guan 已提交
201 202 203 204 205 206 207 208 209
    }
  }

  taosRUnLockLatch(&pMgmt->latch);
  *numOfVnodes = num;

  return pVnodes;
}

210 211 212 213
static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
  int32_t      code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
  int32_t      len = 0;
  int32_t      maxLen = 30000;
S
Shengliang Guan 已提交
214 215 216
  char        *content = calloc(1, maxLen + 1);
  cJSON       *root = NULL;
  FILE        *fp = NULL;
217 218
  char         file[PATH_MAX + 20] = {0};
  SWrapperCfg *pCfgs = NULL;
S
Shengliang Guan 已提交
219 220 221

  snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

222 223 224
  // fp = fopen(file, "r");
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
225 226 227 228 229
    dDebug("file %s not exist", file);
    code = 0;
    goto PRASE_VNODE_OVER;
  }

230
  len = (int32_t)taosReadFile(pFile, content, maxLen);
S
Shengliang Guan 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
  if (len <= 0) {
    dError("failed to read %s since content is null", file);
    goto PRASE_VNODE_OVER;
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
    dError("failed to read %s since invalid json format", file);
    goto PRASE_VNODE_OVER;
  }

  cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
  if (!vnodes || vnodes->type != cJSON_Array) {
    dError("failed to read %s since vnodes not found", file);
    goto PRASE_VNODE_OVER;
  }

  int32_t vnodesNum = cJSON_GetArraySize(vnodes);
250 251 252 253
  if (vnodesNum > 0) {
    pCfgs = calloc(vnodesNum, sizeof(SWrapperCfg));
    if (pCfgs == NULL) {
      dError("failed to read %s since out of memory", file);
S
Shengliang Guan 已提交
254 255 256
      goto PRASE_VNODE_OVER;
    }

257
    for (int32_t i = 0; i < vnodesNum; ++i) {
S
Shengliang Guan 已提交
258
      cJSON       *vnode = cJSON_GetArrayItem(vnodes, i);
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
      SWrapperCfg *pCfg = &pCfgs[i];

      cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
      if (!vgId || vgId->type != cJSON_Number) {
        dError("failed to read %s since vgId not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgId = vgId->valueint;
      snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCfg->vgId);

      cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
      if (!dropped || dropped->type != cJSON_Number) {
        dError("failed to read %s since dropped not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dropped = dropped->valueint;

      cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion");
      if (!vgVersion || vgVersion->type != cJSON_Number) {
        dError("failed to read %s since vgVersion not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->vgVersion = vgVersion->valueint;

      cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
      if (!dbUid || dbUid->type != cJSON_String) {
        dError("failed to read %s since dbUid not found", file);
        goto PRASE_VNODE_OVER;
      }
      pCfg->dbUid = atoll(dbUid->valuestring);

      cJSON *db = cJSON_GetObjectItem(vnode, "db");
      if (!db || db->type != cJSON_String) {
        dError("failed to read %s since db not found", file);
        goto PRASE_VNODE_OVER;
      }
      tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
296
    }
297

298
    *ppCfgs = pCfgs;
S
Shengliang Guan 已提交
299 300
  }

301
  *numOfVnodes = vnodesNum;
S
Shengliang Guan 已提交
302 303 304 305 306 307
  code = 0;
  dInfo("succcessed to read file %s", file);

PRASE_VNODE_OVER:
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
308
  if (pFile != NULL) taosCloseFile(&pFile);
S
Shengliang Guan 已提交
309 310 311 312 313 314 315 316 317 318

  return code;
}

static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
  char file[PATH_MAX + 20] = {0};
  char realfile[PATH_MAX + 20] = {0};
  snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", pDnode->dir.vnodes);
  snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);

319 320 321
  // FILE *fp = fopen(file, "w");
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
322 323 324 325 326 327 328
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to write %s since %s", file, terrstr());
    return -1;
  }
  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

329 330
  int32_t len = 0;
  int32_t maxLen = 65536;
S
Shengliang Guan 已提交
331
  char   *content = calloc(1, maxLen + 1);
332

S
Shengliang Guan 已提交
333
  len += snprintf(content + len, maxLen - len, "{\n");
334
  len += snprintf(content + len, maxLen - len, "  \"vnodes\": [\n");
S
Shengliang Guan 已提交
335 336
  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
337 338 339 340 341 342
    len += snprintf(content + len, maxLen - len, "    {\n");
    len += snprintf(content + len, maxLen - len, "      \"vgId\": %d,\n", pVnode->vgId);
    len += snprintf(content + len, maxLen - len, "      \"dropped\": %d,\n", pVnode->dropped);
    len += snprintf(content + len, maxLen - len, "      \"vgVersion\": %d,\n", pVnode->vgVersion);
    len += snprintf(content + len, maxLen - len, "      \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
    len += snprintf(content + len, maxLen - len, "      \"db\": \"%s\"\n", pVnode->db);
S
Shengliang Guan 已提交
343
    if (i < numOfVnodes - 1) {
344
      len += snprintf(content + len, maxLen - len, "    },\n");
S
Shengliang Guan 已提交
345
    } else {
346
      len += snprintf(content + len, maxLen - len, "    }\n");
S
Shengliang Guan 已提交
347 348
    }
  }
349
  len += snprintf(content + len, maxLen - len, "  ]\n");
S
Shengliang Guan 已提交
350 351
  len += snprintf(content + len, maxLen - len, "}\n");

352 353 354
  taosWriteFile(pFile, content, len);
  taosFsyncFile(pFile);
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
355 356 357 358 359 360 361 362 363 364 365 366
  free(content);
  terrno = 0;

  for (int32_t i = 0; i < numOfVnodes; ++i) {
    SVnodeObj *pVnode = pVnodes[i];
    dndReleaseVnode(pDnode, pVnode);
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

S
Shengliang Guan 已提交
367
  dDebug("successed to write %s", realfile);
S
Shengliang Guan 已提交
368 369 370 371 372
  return taosRenameFile(file, realfile);
}

static void *dnodeOpenVnodeFunc(void *param) {
  SVnodeThread *pThread = param;
S
Shengliang Guan 已提交
373 374
  SDnode       *pDnode = pThread->pDnode;
  SVnodesMgmt  *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
375 376 377 378 379

  dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
  setThreadName("open-vnodes");

  for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
380
    SWrapperCfg *pCfg = &pThread->pCfgs[v];
S
Shengliang Guan 已提交
381 382

    char stepDesc[TSDB_STEP_DESC_LEN] = {0};
383
    snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
S
Shengliang Guan 已提交
384 385 386
             pMgmt->openVnodes, pMgmt->totalVnodes);
    dndReportStartup(pDnode, "open-vnodes", stepDesc);

D
dapan1121 已提交
387
    SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
S
Shengliang Guan 已提交
388
    SVnode   *pImpl = vnodeOpen(pCfg->path, &cfg);
S
Shengliang Guan 已提交
389
    if (pImpl == NULL) {
390
      dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
391 392
      pThread->failed++;
    } else {
393
      dndOpenVnode(pDnode, pCfg, pImpl);
394
      dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
S
Shengliang Guan 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
      pThread->opened++;
    }

    atomic_add_fetch_32(&pMgmt->openVnodes, 1);
  }

  dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
         pThread->failed);
  return NULL;
}

static int32_t dndOpenVnodes(SDnode *pDnode) {
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
  taosInitRWLatch(&pMgmt->latch);

  pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMgmt->hash == NULL) {
    dError("failed to init vnode hash");
S
Shengliang Guan 已提交
413
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
414 415 416
    return -1;
  }

417 418 419
  SWrapperCfg *pCfgs = NULL;
  int32_t      numOfVnodes = 0;
  if (dndGetVnodesFromFile(pDnode, &pCfgs, &numOfVnodes) != 0) {
S
Shengliang Guan 已提交
420 421 422 423 424 425
    dInfo("failed to get vnode list from disk since %s", terrstr());
    return -1;
  }

  pMgmt->totalVnodes = numOfVnodes;

S
config  
Shengliang Guan 已提交
426
  int32_t threadNum = tsNumOfCores;
S
Shengliang Guan 已提交
427
#if 1
S
Shengliang Guan 已提交
428
  threadNum = 1;
S
Shengliang Guan 已提交
429
#endif
S
Shengliang Guan 已提交
430

S
Shengliang Guan 已提交
431 432
  int32_t vnodesPerThread = numOfVnodes / threadNum + 1;

S
Shengliang Guan 已提交
433 434 435
  SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
  for (int32_t t = 0; t < threadNum; ++t) {
    threads[t].threadIndex = t;
436
    threads[t].pDnode = pDnode;
437
    threads[t].pCfgs = calloc(vnodesPerThread, sizeof(SWrapperCfg));
S
Shengliang Guan 已提交
438 439 440 441 442
  }

  for (int32_t v = 0; v < numOfVnodes; ++v) {
    int32_t       t = v % threadNum;
    SVnodeThread *pThread = &threads[t];
443
    pThread->pCfgs[pThread->vnodeNum++] = pCfgs[v];
S
Shengliang Guan 已提交
444 445 446 447 448 449 450 451
  }

  dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
    if (pThread->vnodeNum == 0) continue;

452 453 454 455
    pthread_attr_t thAttr;
    pthread_attr_init(&thAttr);
    pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
    if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnodeFunc, pThread) != 0) {
S
Shengliang Guan 已提交
456 457
      dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
    }
458 459

    pthread_attr_destroy(&thAttr);
S
Shengliang Guan 已提交
460 461 462 463
  }

  for (int32_t t = 0; t < threadNum; ++t) {
    SVnodeThread *pThread = &threads[t];
464 465 466
    if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
      pthread_join(pThread->thread, NULL);
    }
467
    free(pThread->pCfgs);
S
Shengliang Guan 已提交
468 469
  }
  free(threads);
470
  free(pCfgs);
S
Shengliang Guan 已提交
471 472 473 474 475 476 477 478 479 480 481

  if (pMgmt->openVnodes != pMgmt->totalVnodes) {
    dError("there are total vnodes:%d, opened:%d", pMgmt->totalVnodes, pMgmt->openVnodes);
    return -1;
  } else {
    dInfo("total vnodes:%d open successfully", pMgmt->totalVnodes);
    return 0;
  }
}

static void dndCloseVnodes(SDnode *pDnode) {
S
Shengliang Guan 已提交
482
  dInfo("start to close all vnodes");
S
Shengliang Guan 已提交
483 484 485 486 487 488
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t     numOfVnodes = 0;
  SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes);

  for (int32_t i = 0; i < numOfVnodes; ++i) {
489
    dndCloseVnode(pDnode, pVnodes[i]);
S
Shengliang Guan 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503
  }

  if (pVnodes != NULL) {
    free(pVnodes);
  }

  if (pMgmt->hash != NULL) {
    taosHashCleanup(pMgmt->hash);
    pMgmt->hash = NULL;
  }

  dInfo("total vnodes:%d are all closed", numOfVnodes);
}

S
Shengliang Guan 已提交
504
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
S
Shengliang Guan 已提交
505
  pCfg->vgId = pCreate->vgId;
506 507 508 509 510 511
  pCfg->wsize = pCreate->cacheBlockSize;
  pCfg->ssize = pCreate->cacheBlockSize;
  pCfg->lsize = pCreate->cacheBlockSize;
  pCfg->isHeapAllocator = true;
  pCfg->ttl = 4;
  pCfg->keep = pCreate->daysToKeep0;
L
Liu Jicong 已提交
512
  pCfg->streamMode = pCreate->streamMode;
513
  pCfg->isWeak = true;
H
Hongze Cheng 已提交
514
  pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
515 516 517 518 519 520 521 522 523 524 525
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
  pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
  pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
  pCfg->walCfg.level = pCreate->walLevel;
  pCfg->walCfg.retentionPeriod = 10;
  pCfg->walCfg.retentionSize = 128;
  pCfg->walCfg.rollPeriod = 128;
  pCfg->walCfg.segSize = 128;
  pCfg->walCfg.vgId = pCreate->vgId;
D
dapan1121 已提交
526 527 528
  pCfg->hashBegin = pCreate->hashBegin;
  pCfg->hashEnd = pCreate->hashEnd;
  pCfg->hashMethod = pCreate->hashMethod;
529 530
}

S
Shengliang Guan 已提交
531
static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
532
  memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
533 534 535 536 537
  pCfg->dbUid = pCreate->dbUid;
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s/vnode%d", pDnode->dir.vnodes, pCreate->vgId);
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
538 539
}

S
Shengliang Guan 已提交
540
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
541 542 543 544 545 546 547
  SCreateVnodeReq createReq = {0};
  if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  dDebug("vgId:%d, create vnode req is received", createReq.vgId);
548

S
Shengliang Guan 已提交
549
  SVnodeCfg vnodeCfg = {0};
S
Shengliang Guan 已提交
550
  dndGenerateVnodeCfg(&createReq, &vnodeCfg);
S
Shengliang Guan 已提交
551

552
  SWrapperCfg wrapperCfg = {0};
S
Shengliang Guan 已提交
553
  dndGenerateWrapperCfg(pDnode, &createReq, &wrapperCfg);
S
Shengliang Guan 已提交
554

S
Shengliang Guan 已提交
555
  if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
556
    terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
S
Shengliang Guan 已提交
557
    dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
558 559 560
    return -1;
  }

S
Shengliang Guan 已提交
561
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, createReq.vgId);
S
Shengliang Guan 已提交
562
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
563
    dDebug("vgId:%d, already exist", createReq.vgId);
S
Shengliang Guan 已提交
564
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
565
    terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
566
    return -1;
S
Shengliang Guan 已提交
567 568
  }

S
Shengliang Guan 已提交
569
  vnodeCfg.pDnode = pDnode;
S
Shengliang Guan 已提交
570
  vnodeCfg.pTfs = pDnode->pTfs;
D
dapan1121 已提交
571
  vnodeCfg.dbId = wrapperCfg.dbUid;
S
Shengliang Guan 已提交
572
  SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
573
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
574
    dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
575 576 577
    return -1;
  }

578
  int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
579
  if (code != 0) {
S
Shengliang Guan 已提交
580
    dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
581 582 583 584 585 586 587 588 589 590 591 592
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
  }

  code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    vnodeClose(pImpl);
    vnodeDestroy(wrapperCfg.path);
    terrno = code;
    return code;
S
Shengliang Guan 已提交
593 594 595 596 597
  }

  return 0;
}

S
Shengliang Guan 已提交
598
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
599 600 601 602 603 604 605
  SAlterVnodeReq alterReq = {0};
  if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  dDebug("vgId:%d, alter vnode req is received", alterReq.vgId);
S
Shengliang Guan 已提交
606

607
  SVnodeCfg vnodeCfg = {0};
S
Shengliang Guan 已提交
608
  dndGenerateVnodeCfg(&alterReq, &vnodeCfg);
S
Shengliang Guan 已提交
609

S
Shengliang Guan 已提交
610
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, alterReq.vgId);
S
Shengliang Guan 已提交
611
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
612
    dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
S
Shengliang Guan 已提交
613
    return -1;
S
Shengliang Guan 已提交
614 615
  }

S
Shengliang Guan 已提交
616
  if (alterReq.vgVersion == pVnode->vgVersion) {
617
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
618
    dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId);
619 620 621
    return 0;
  }

S
Shengliang Guan 已提交
622
  if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
S
Shengliang Guan 已提交
623
    dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
S
Shengliang Guan 已提交
624
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
625
    return -1;
S
Shengliang Guan 已提交
626 627
  }

628
  int32_t oldVersion = pVnode->vgVersion;
S
Shengliang Guan 已提交
629
  pVnode->vgVersion = alterReq.vgVersion;
630 631 632 633 634
  int32_t code = dndWriteVnodesToFile(pDnode);
  if (code != 0) {
    pVnode->vgVersion = oldVersion;
  }

S
Shengliang Guan 已提交
635
  dndReleaseVnode(pDnode, pVnode);
636
  return code;
S
Shengliang Guan 已提交
637 638
}

S
Shengliang Guan 已提交
639
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
640
  SDropVnodeReq dropReq = {0};
S
Shengliang Guan 已提交
641 642 643 644
  if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }
S
Shengliang Guan 已提交
645

S
Shengliang Guan 已提交
646
  int32_t vgId = dropReq.vgId;
S
Shengliang Guan 已提交
647 648 649 650 651
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
652 653
    terrno = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
    return -1;
S
Shengliang Guan 已提交
654 655
  }

656 657 658
  pVnode->dropped = 1;
  if (dndWriteVnodesToFile(pDnode) != 0) {
    pVnode->dropped = 0;
S
Shengliang Guan 已提交
659 660
    dndReleaseVnode(pDnode, pVnode);
    return -1;
S
Shengliang Guan 已提交
661 662
  }

663
  dndCloseVnode(pDnode, pVnode);
664 665
  dndWriteVnodesToFile(pDnode);

S
Shengliang Guan 已提交
666 667 668
  return 0;
}

S
Shengliang Guan 已提交
669
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
670 671
  SSyncVnodeReq syncReq = {0};
  tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
S
Shengliang Guan 已提交
672

S
Shengliang Guan 已提交
673
  int32_t vgId = syncReq.vgId;
S
Shengliang Guan 已提交
674
  dDebug("vgId:%d, sync vnode req is received", vgId);
S
Shengliang Guan 已提交
675 676 677

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
678 679
    dDebug("vgId:%d, failed to sync since %s", vgId, terrstr());
    return -1;
S
Shengliang Guan 已提交
680 681 682
  }

  if (vnodeSync(pVnode->pImpl) != 0) {
S
Shengliang Guan 已提交
683
    dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr());
S
Shengliang Guan 已提交
684
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
685
    return -1;
S
Shengliang Guan 已提交
686 687 688 689 690 691
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang Guan 已提交
692
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
S
Shengliang Guan 已提交
693 694
  SCompactVnodeReq compatcReq = {0};
  tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
S
Shengliang Guan 已提交
695

S
Shengliang Guan 已提交
696
  int32_t vgId = compatcReq.vgId;
S
Shengliang Guan 已提交
697 698 699 700 701
  dDebug("vgId:%d, compact vnode req is received", vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to compact since %s", vgId, terrstr());
S
Shengliang Guan 已提交
702
    return -1;
S
Shengliang Guan 已提交
703 704 705 706 707
  }

  if (vnodeCompact(pVnode->pImpl) != 0) {
    dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr());
    dndReleaseVnode(pDnode, pVnode);
S
Shengliang Guan 已提交
708
    return -1;
S
Shengliang Guan 已提交
709 710 711 712 713 714
  }

  dndReleaseVnode(pDnode, pVnode);
  return 0;
}

S
Shengliang 已提交
715
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
716

S
Shengliang 已提交
717
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
S
Shengliang Guan 已提交
718

719
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
720
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
721 722

  for (int32_t i = 0; i < numOfMsgs; ++i) {
723 724 725
    SRpcMsg *pMsg = NULL;
    taosGetQitem(qall, (void **)&pMsg);
    void *ptr = taosArrayPush(pArray, &pMsg);
726
    assert(ptr != NULL);
S
Shengliang Guan 已提交
727 728
  }

H
more  
Hongze Cheng 已提交
729
  vnodeProcessWMsgs(pVnode->pImpl, pArray);
730 731 732 733

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pRsp = NULL;
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
H
more  
Hongze Cheng 已提交
734
    int32_t  code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
735
    if (pRsp != NULL) {
S
Shengliang Guan 已提交
736
      pRsp->ahandle = pMsg->ahandle;
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
      rpcSendResponse(pRsp);
      free(pRsp);
    } else {
      if (code != 0) code = terrno;
      SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
      rpcSendResponse(&rpcRsp);
    }
  }

  for (size_t i = 0; i < numOfMsgs; i++) {
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }

  taosArrayDestroy(pArray);
S
Shengliang Guan 已提交
753 754
}

755
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
756 757
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
758 759
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
760

S
Shengliang Guan 已提交
761
    // todo
762 763
    SRpcMsg *pRsp = NULL;
    (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
764 765 766
  }
}

767
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
768 769
  SRpcMsg *pMsg = NULL;

S
Shengliang Guan 已提交
770 771
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);
772

S
Shengliang Guan 已提交
773
    // todo
774 775
    SRpcMsg *pRsp = NULL;
    (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
S
Shengliang Guan 已提交
776 777 778
  }
}

S
Shengliang Guan 已提交
779
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
S
Shengliang Guan 已提交
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
  int32_t code = 0;

  if (pQueue == NULL) {
    code = TSDB_CODE_MSG_NOT_PROCESSED;
  } else {
    SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
    if (pMsg == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *pMsg = *pRpcMsg;
      if (taosWriteQitem(pQueue, pMsg) != 0) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

S
Shengliang Guan 已提交
796
  if (code != TSDB_CODE_SUCCESS && sendRsp) {
S
Shengliang Guan 已提交
797 798 799 800
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
801 802
    rpcFreeCont(pRpcMsg->pCont);
  }
S
Shengliang Guan 已提交
803 804

  return code;
S
Shengliang Guan 已提交
805 806 807
}

static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
808
  SMsgHead *pHead = pMsg->pCont;
809
  pHead->contLen = htonl(pHead->contLen);
S
Shengliang Guan 已提交
810 811 812 813
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
814
    dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId);
S
Shengliang Guan 已提交
815 816 817 818
    if (pMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
      rpcSendResponse(&rsp);
    }
S
Shengliang Guan 已提交
819 820 821 822 823 824 825 826 827
    rpcFreeCont(pMsg->pCont);
  }

  return pVnode;
}

void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
828
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
S
Shengliang Guan 已提交
829 830 831 832 833 834 835
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
836
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
S
Shengliang Guan 已提交
837 838 839 840 841 842 843
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
844
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
S
Shengliang Guan 已提交
845 846 847 848 849 850 851
    dndReleaseVnode(pDnode, pVnode);
  }
}

void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
  if (pVnode != NULL) {
S
Shengliang Guan 已提交
852
    (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
S
Shengliang Guan 已提交
853 854 855 856
    dndReleaseVnode(pDnode, pVnode);
  }
}

S
Shengliang Guan 已提交
857 858 859 860 861 862 863
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
  SMsgHead *pHead = pMsg->pCont;
  // pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
  if (pVnode == NULL) return -1;

D
dapan1121 已提交
864
  int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, false);
S
Shengliang Guan 已提交
865 866 867 868
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

869
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
870
  SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
S
Shengliang Guan 已提交
871
  if (pVnode == NULL) return -1;
S
Shengliang Guan 已提交
872 873 874 875 876 877

  int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
  dndReleaseVnode(pDnode, pVnode);
  return code;
}

S
Shengliang Guan 已提交
878
static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
879 880 881
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  int32_t maxFetchThreads = 4;
S
config  
Shengliang Guan 已提交
882 883
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
S
Shengliang Guan 已提交
884
  int32_t maxQueryThreads = minQueryThreads;
S
config  
Shengliang Guan 已提交
885 886
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);
S
Shengliang Guan 已提交
887

S
Shengliang Guan 已提交
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

  SFWorkerPool *pFPool = &pMgmt->fetchPool;
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
  if (tFWorkerInit(pFPool) != 0) return -1;

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;
S
Shengliang Guan 已提交
909 910

  dDebug("vnode workers is initialized");
S
Shengliang Guan 已提交
911 912 913
  return 0;
}

S
Shengliang Guan 已提交
914
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
S
Shengliang Guan 已提交
915
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
916
  tFWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
917 918 919
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
920
  dDebug("vnode workers is closed");
S
Shengliang Guan 已提交
921 922
}

S
Shengliang Guan 已提交
923
static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
924 925
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

S
Shengliang Guan 已提交
926 927 928
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
S
Shengliang Guan 已提交
929
  pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
S
Shengliang Guan 已提交
930
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
S
Shengliang Guan 已提交
931 932 933

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
S
Shengliang Guan 已提交
934 935 936 937 938 939 940
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
941
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
942
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;
S
Shengliang Guan 已提交
943
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
944
  tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
Shengliang Guan 已提交
945 946 947
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
948 949
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
S
Shengliang Guan 已提交
950
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
951 952
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
953 954 955 956 957
}

int32_t dndInitVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to init");

S
Shengliang Guan 已提交
958 959 960
  if (dndInitVnodeWorkers(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    dError("failed to init vnode workers since %s", terrstr());
S
Shengliang Guan 已提交
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
    return -1;
  }

  if (dndOpenVnodes(pDnode) != 0) {
    dError("failed to open vnodes since %s", terrstr());
    return -1;
  }

  dInfo("dnode-vnodes is initialized");
  return 0;
}

void dndCleanupVnodes(SDnode *pDnode) {
  dInfo("dnode-vnodes start to clean up");
  dndCloseVnodes(pDnode);
S
Shengliang Guan 已提交
976
  dndCleanupVnodeWorkers(pDnode);
S
Shengliang Guan 已提交
977 978 979
  dInfo("dnode-vnodes is cleaned up");
}

S
Shengliang Guan 已提交
980
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
S
Shengliang Guan 已提交
981 982 983 984 985
  SVnodesMgmt *pMgmt = &pDnode->vmgmt;

  taosRLockLatch(&pMgmt->latch);

  int32_t v = 0;
S
Shengliang Guan 已提交
986
  void   *pIter = taosHashIterate(pMgmt->hash, NULL);
S
Shengliang Guan 已提交
987 988 989 990
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

S
Shengliang Guan 已提交
991 992 993 994
    SVnodeObj *pVnode = *ppVnode;
    SVnodeLoad vload = {0};
    vnodeGetLoad(pVnode->pImpl, &vload);
    taosArrayPush(pLoads, &vload);
S
Shengliang Guan 已提交
995 996 997 998 999 1000

    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

  taosRUnLockLatch(&pMgmt->latch);
}