vmHandle.c 23.6 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

C
Cary Xu 已提交
19
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
S
Shengliang Guan 已提交
20 21 22
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
  if (pInfo->pVloads == NULL) return;

23
  taosThreadRwlockRdlock(&pMgmt->lock);
S
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

    SVnodeObj *pVnode = *ppVnode;
    SVnodeLoad vload = {0};
    vnodeGetLoad(pVnode->pImpl, &vload);
C
Cary Xu 已提交
33
    if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
S
Shengliang Guan 已提交
34 35 36 37
    taosArrayPush(pInfo->pVloads, &vload);
    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

38
  taosThreadRwlockUnlock(&pMgmt->lock);
S
Shengliang Guan 已提交
39 40
}

S
Shengliang Guan 已提交
41
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
S
Shengliang Guan 已提交
42
  SMonVloadInfo vloads = {0};
C
Cary Xu 已提交
43
  vmGetVnodeLoads(pMgmt, &vloads, true);
S
Shengliang Guan 已提交
44 45 46

  SArray *pVloads = vloads.pVloads;
  if (pVloads == NULL) return;
S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55

  int32_t totalVnodes = 0;
  int32_t masterNum = 0;
  int64_t numOfSelectReqs = 0;
  int64_t numOfInsertReqs = 0;
  int64_t numOfInsertSuccessReqs = 0;
  int64_t numOfBatchInsertReqs = 0;
  int64_t numOfBatchInsertSuccessReqs = 0;

S
Shengliang Guan 已提交
56 57
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
S
Shengliang Guan 已提交
58 59 60 61 62 63 64 65 66 67 68
    numOfSelectReqs += pLoad->numOfSelectReqs;
    numOfInsertReqs += pLoad->numOfInsertReqs;
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER) masterNum++;
    totalVnodes++;
  }

  pInfo->vstat.totalVnodes = totalVnodes;
  pInfo->vstat.masterNum = masterNum;
69
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs;
C
Cary Xu 已提交
70 71 72 73
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs;                          // delta
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;            // delta
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;                // delta
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;  // delta
S
Shengliang Guan 已提交
74 75 76 77 78 79 80
  pMgmt->state.totalVnodes = totalVnodes;
  pMgmt->state.masterNum = masterNum;
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
S
Shengliang Guan 已提交
81

82
  tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
S
Shengliang Guan 已提交
83
  taosArrayDestroy(pVloads);
84 85
}

S
shm  
Shengliang Guan 已提交
86
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
H
Hongze Cheng 已提交
87 88
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));

S
shm  
Shengliang Guan 已提交
89
  pCfg->vgId = pCreate->vgId;
S
Shengliang Guan 已提交
90 91
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
  pCfg->dbId = pCreate->dbUid;
H
Hongze Cheng 已提交
92 93
  pCfg->szPage = pCreate->pageSize * 1024;
  pCfg->szCache = pCreate->pages;
94 95
  pCfg->cacheLast = pCreate->cacheLast;
  pCfg->cacheLastSize = pCreate->cacheLastSize;
H
Hongze Cheng 已提交
96
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
S
shm  
Shengliang Guan 已提交
97
  pCfg->isWeak = true;
C
Cary Xu 已提交
98
  pCfg->isTsma = pCreate->isTsma;
C
Cary Xu 已提交
99
  pCfg->tsdbCfg.compression = pCreate->compression;
H
Hongze Cheng 已提交
100
  pCfg->tsdbCfg.precision = pCreate->precision;
101 102 103 104
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
H
Hongze Cheng 已提交
105 106
  pCfg->tsdbCfg.minRows = pCreate->minRows;
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
C
Cary Xu 已提交
107
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
C
Cary Xu 已提交
108 109 110 111 112
    SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
    memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
    if (i == 0) {
      if ((pRetention->freq > 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
    }
C
Cary Xu 已提交
113
  }
C
Cary Xu 已提交
114

S
shm  
Shengliang Guan 已提交
115
  pCfg->walCfg.vgId = pCreate->vgId;
116
  pCfg->walCfg.fsyncPeriod = pCreate->walFsyncPeriod;
S
Shengliang Guan 已提交
117
  pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
L
Liu Jicong 已提交
118 119
  pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
  pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
S
Shengliang Guan 已提交
120 121 122
  pCfg->walCfg.segSize = pCreate->walSegmentSize;
  pCfg->walCfg.level = pCreate->walLevel;

H
Hongze Cheng 已提交
123
  pCfg->sttTrigger = pCreate->sstTrigger;
S
shm  
Shengliang Guan 已提交
124 125 126
  pCfg->hashBegin = pCreate->hashBegin;
  pCfg->hashEnd = pCreate->hashEnd;
  pCfg->hashMethod = pCreate->hashMethod;
127 128
  pCfg->hashPrefix = pCreate->hashPrefix;
  pCfg->hashSuffix = pCreate->hashSuffix;
129
  pCfg->tsdbPageSize = pCreate->tsdbPageSize * 1024;
M
Minghao Li 已提交
130

S
Shengliang Guan 已提交
131
  pCfg->standby = 0;
M
Minghao Li 已提交
132 133
  pCfg->syncCfg.myIndex = pCreate->selfIndex;
  pCfg->syncCfg.replicaNum = pCreate->replica;
S
Shengliang Guan 已提交
134
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
135
  for (int32_t i = 0; i < pCreate->replica; ++i) {
136
    SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
137
    pNode->nodeId = pCreate->replicas[i].id;
138
    pNode->nodePort = pCreate->replicas[i].port;
139
    tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
140
    tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
M
Minghao Li 已提交
141
  }
S
shm  
Shengliang Guan 已提交
142
}
S
shm  
Shengliang Guan 已提交
143

S
Shengliang 已提交
144
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
S
shm  
Shengliang Guan 已提交
145 146
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
147 148
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
S
shm  
Shengliang Guan 已提交
149 150
}

C
Cary Xu 已提交
151 152 153 154 155 156 157 158 159
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
  if (pReq->isTsma) {
    SMsgHead *smaMsg = pReq->pTsma;
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
    return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
  }
  return 0;
}

K
kailixu 已提交
160
#if 0
C
Cary Xu 已提交
161 162 163 164 165 166 167 168
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
  if (pReq->isTsma) {
    SMsgHead *smaMsg = pReq->pTsma;
    uint32_t  contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
    return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
  }
  return 0;
}
K
kailixu 已提交
169
#endif
C
Cary Xu 已提交
170

S
Shengliang Guan 已提交
171
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
172
  SCreateVnodeReq req = {0};
173 174
  SVnodeCfg       vnodeCfg = {0};
  SWrapperCfg     wrapperCfg = {0};
S
Shengliang Guan 已提交
175 176
  int32_t         code = -1;
  char            path[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
177

178
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
S
shm  
Shengliang Guan 已提交
179 180 181 182
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
183 184 185 186 187 188 189 190 191 192 193
  dInfo("vgId:%d, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
        ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
        ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
        ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
        ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d strict:%d",
        req.vgId, req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
        req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
        req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
        req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
        req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix,
        req.hashSuffix, req.replica, req.selfIndex, req.strict);
194
  for (int32_t i = 0; i < req.replica; ++i) {
195 196
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
          req.replicas[i].id);
197
  }
198 199 200 201 202 203 204 205 206 207

  SReplica *pReplica = &req.replicas[req.selfIndex];
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", req.vgId, pReplica->id, pReplica->fqdn,
           pReplica->port);
    return -1;
  }

208 209 210 211
  vmGenerateVnodeCfg(&req, &vnodeCfg);

  if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) {
    dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr());
C
Cary Xu 已提交
212 213
    code = terrno;
    goto _OVER;
C
Cary Xu 已提交
214 215
  }

216
  vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
S
shm  
Shengliang Guan 已提交
217

218
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
S
shm  
Shengliang Guan 已提交
219
  if (pVnode != NULL) {
220
    dInfo("vgId:%d, already exist", req.vgId);
221
    tFreeSCreateVnodeReq(&req);
S
shm  
Shengliang Guan 已提交
222
    vmReleaseVnode(pMgmt, pVnode);
223
    terrno = TSDB_CODE_VND_ALREADY_EXIST;
C
Cary Xu 已提交
224
    code = terrno;
225
    return 0;
S
shm  
Shengliang Guan 已提交
226 227
  }

H
Hongze Cheng 已提交
228 229
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
  if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
230 231
    tFreeSCreateVnodeReq(&req);
    dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
C
Cary Xu 已提交
232 233
    code = terrno;
    goto _OVER;
H
Hongze Cheng 已提交
234 235
  }

S
Shengliang 已提交
236
  SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
237
  if (pImpl == NULL) {
238
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
239
    code = terrno;
S
Shengliang Guan 已提交
240
    goto _OVER;
S
shm  
Shengliang Guan 已提交
241 242
  }

S
Shengliang Guan 已提交
243
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
S
shm  
Shengliang Guan 已提交
244
  if (code != 0) {
245
    dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
C
Cary Xu 已提交
246
    code = terrno;
S
Shengliang Guan 已提交
247
    goto _OVER;
S
shm  
Shengliang Guan 已提交
248 249
  }

K
kailixu 已提交
250
#if 0
251
  code = vmTsmaProcessCreate(pImpl, &req);
C
Cary Xu 已提交
252
  if (code != 0) {
253
    dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
C
Cary Xu 已提交
254 255
    code = terrno;
    goto _OVER;
C
Cary Xu 已提交
256
  }
K
kailixu 已提交
257
#endif
C
Cary Xu 已提交
258

L
Li Minghao 已提交
259 260
  code = vnodeStart(pImpl);
  if (code != 0) {
261
    dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
S
Shengliang Guan 已提交
262
    goto _OVER;
L
Li Minghao 已提交
263 264
  }

S
Shengliang Guan 已提交
265
  code = vmWriteVnodeListToFile(pMgmt);
C
Cary Xu 已提交
266 267 268 269
  if (code != 0) {
    code = terrno;
    goto _OVER;
  }
S
Shengliang Guan 已提交
270 271

_OVER:
S
shm  
Shengliang Guan 已提交
272 273
  if (code != 0) {
    vnodeClose(pImpl);
H
refact  
Hongze Cheng 已提交
274
    vnodeDestroy(path, pMgmt->pTfs);
S
Shengliang Guan 已提交
275
  } else {
276
    dInfo("vgId:%d, vnode is created", req.vgId);
S
shm  
Shengliang Guan 已提交
277 278
  }

279
  tFreeSCreateVnodeReq(&req);
S
Shengliang Guan 已提交
280 281
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
282 283
}

284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SDisableVnodeWriteReq req = {0};
  if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
  if (pVnode == NULL) {
    dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
    terrno = TSDB_CODE_VND_NOT_EXIST;
    return -1;
  }

  pVnode->disable = req.disable;
  vmReleaseVnode(pMgmt, pVnode);
  return 0;
}

305 306 307 308 309 310 311
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  SAlterVnodeHashRangeReq req = {0};
  if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

312 313
  int32_t srcVgId = req.srcVgId;
  int32_t dstVgId = req.dstVgId;
314 315
  dInfo("vgId:%d, start to alter vnode hashrange[%u, %u), dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
        req.dstVgId);
316

317 318 319 320 321 322 323
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
  if (pVnode == NULL) {
    dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
    terrno = TSDB_CODE_VND_NOT_EXIST;
    return -1;
  }

324 325 326 327 328 329 330 331
  SWrapperCfg wrapperCfg = {
      .dropped = pVnode->dropped,
      .vgId = dstVgId,
      .vgVersion = pVnode->vgVersion,
  };
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));

  dInfo("vgId:%d, close vnode", srcVgId);
S
Shengliang Guan 已提交
332
  vmCloseVnode(pMgmt, pVnode, true);
333 334 335 336 337 338

  char srcPath[TSDB_FILENAME_LEN] = {0};
  char dstPath[TSDB_FILENAME_LEN] = {0};
  snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
  snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);

339
  dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
340 341 342 343 344
  if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) {
    dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
    return -1;
  }

345
  dInfo("vgId:%d, open vnode", dstVgId);
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
  SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
  if (pImpl == NULL) {
    dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
    return -1;
  }

  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
    dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
    return -1;
  }

  if (vnodeStart(pImpl) != 0) {
    dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
    return -1;
  }

  dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
363 364 365 366
  return 0;
}

int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
367
  SAlterVnodeReplicaReq alterReq = {0};
S
Shengliang Guan 已提交
368
  if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
S
Shengliang Guan 已提交
369 370 371 372 373
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t vgId = alterReq.vgId;
374 375
  dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex,
        alterReq.strict);
376
  for (int32_t i = 0; i < alterReq.replica; ++i) {
377
    SReplica *pReplica = &alterReq.replicas[i];
378
    dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
379 380 381 382
  }

  if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) {
    terrno = TSDB_CODE_INVALID_MSG;
383
    dError("vgId:%d, failed to alter replica since invalid msg", vgId);
384 385
    return -1;
  }
S
Shengliang Guan 已提交
386

387 388 389 390
  SReplica *pReplica = &alterReq.replicas[alterReq.selfIndex];
  if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
      strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
391
    dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
392 393 394 395
           pReplica->port);
    return -1;
  }

S
Shengliang Guan 已提交
396 397 398
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
  if (pVnode == NULL) {
    dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
399
    terrno = TSDB_CODE_VND_NOT_EXIST;
S
Shengliang Guan 已提交
400 401 402 403
    return -1;
  }

  dInfo("vgId:%d, start to close vnode", vgId);
S
Shengliang Guan 已提交
404 405 406 407 408 409
  SWrapperCfg wrapperCfg = {
      .dropped = pVnode->dropped,
      .vgId = pVnode->vgId,
      .vgVersion = pVnode->vgVersion,
  };
  tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
S
Shengliang Guan 已提交
410
  vmCloseVnode(pMgmt, pVnode, false);
S
Shengliang Guan 已提交
411 412 413 414 415

  char path[TSDB_FILENAME_LEN] = {0};
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);

  dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
416
  if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) {
S
Shengliang Guan 已提交
417 418 419 420 421
    dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
    return -1;
  }

  dInfo("vgId:%d, start to open vnode", vgId);
S
Shengliang Guan 已提交
422 423
  SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
  if (pImpl == NULL) {
S
Shengliang Guan 已提交
424 425 426 427
    dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
428 429 430 431 432 433 434 435 436 437
  if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
    dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
    return -1;
  }

  if (vnodeStart(pImpl) != 0) {
    dError("vgId:%d, failed to start sync since %s", vgId, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
438 439 440 441
  dInfo("vgId:%d, vnode config is altered", vgId);
  return 0;
}

S
Shengliang Guan 已提交
442
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
443
  SDropVnodeReq dropReq = {0};
S
Shengliang Guan 已提交
444
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
S
shm  
Shengliang Guan 已提交
445 446 447 448 449
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t vgId = dropReq.vgId;
450
  dInfo("vgId:%d, start to drop vnode", vgId);
S
shm  
Shengliang Guan 已提交
451

452 453 454 455 456 457
  if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
    terrno = TSDB_CODE_INVALID_MSG;
    dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
    return -1;
  }

S
shm  
Shengliang Guan 已提交
458 459
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
  if (pVnode == NULL) {
460
    dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
461
    terrno = TSDB_CODE_VND_NOT_EXIST;
S
shm  
Shengliang Guan 已提交
462 463 464 465
    return -1;
  }

  pVnode->dropped = 1;
S
Shengliang Guan 已提交
466
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
S
shm  
Shengliang Guan 已提交
467 468 469 470 471
    pVnode->dropped = 0;
    vmReleaseVnode(pMgmt, pVnode);
    return -1;
  }

S
Shengliang Guan 已提交
472
  vmCloseVnode(pMgmt, pVnode, false);
S
Shengliang Guan 已提交
473
  vmWriteVnodeListToFile(pMgmt);
S
shm  
Shengliang Guan 已提交
474

475
  dInfo("vgId:%d, is dropped", vgId);
S
shm  
Shengliang Guan 已提交
476 477 478
  return 0;
}

S
Shengliang Guan 已提交
479
SArray *vmGetMsgHandles() {
S
Shengliang 已提交
480
  int32_t code = -1;
S
Shengliang Guan 已提交
481
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
S
Shengliang 已提交
482 483
  if (pArray == NULL) goto _OVER;

S
Shengliang Guan 已提交
484
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
485
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
486
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
487
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
C
Cary Xu 已提交
488
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
C
Cary Xu 已提交
489
  if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
490
  if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
491
  if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
492 493 494
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
495
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
496
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
497
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
498 499
  if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
500
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
wmmhello's avatar
wmmhello 已提交
501
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
502 503 504 505 506 507 508 509
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
L
Liu Jicong 已提交
510 511 512 513 514 515
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
516
  if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
517
  if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
H
Hongze Cheng 已提交
518
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
D
dapan1121 已提交
519
  if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
520

L
Liu Jicong 已提交
521
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
522
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
L
Liu Jicong 已提交
523 524 525 526 527
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
L
Liu Jicong 已提交
528
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
529 530 531
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
532

S
Shengliang Guan 已提交
533
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
534
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
535
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
536
  if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
537
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
538
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
539
  if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
S
Shengliang Guan 已提交
540 541 542
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;

543 544
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
545
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
546 547 548 549
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
550
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
551
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
552 553
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
554 555 556
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;

557
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
558
  if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
S
Shengliang 已提交
559 560 561 562 563 564 565 566 567 568

  code = 0;

_OVER:
  if (code != 0) {
    taosArrayDestroy(pArray);
    return NULL;
  } else {
    return pArray;
  }
S
shm  
Shengliang Guan 已提交
569
}