mgmtVgroup.c 21.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18 19 20
#include "taoserror.h"
#include "tlog.h"
#include "tstatus.h"
S
slguan 已提交
21
#include "mnode.h"
S
slguan 已提交
22 23
#include "mgmtBalance.h"
#include "mgmtDb.h"
S
slguan 已提交
24
#include "mgmtDClient.h"
S
slguan 已提交
25
#include "mgmtDnode.h"
S
slguan 已提交
26
#include "mgmtDServer.h"
27
#include "mgmtProfile.h"
S
slguan 已提交
28
#include "mgmtSdb.h"
S
slguan 已提交
29
#include "mgmtShell.h"
S
#1177  
slguan 已提交
30
#include "mgmtTable.h"
S
slguan 已提交
31
#include "mgmtVgroup.h"
H
hzcheng 已提交
32

S
slguan 已提交
33
static void   *tsVgroupSdb = NULL;
S
slguan 已提交
34 35
static int32_t tsVgUpdateSize = 0;

H
hjxilinx 已提交
36
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
S
slguan 已提交
37
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
38
static void    mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
39
static void    mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
S
slguan 已提交
40
static void    mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) ;
41

42 43
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
S
slguan 已提交
44

S
slguan 已提交
45 46
static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
  SVgObj *pVgroup = pOper->pObj;
S
slguan 已提交
47 48 49 50
  if (pVgroup->idPool) {
    taosIdPoolCleanUp(pVgroup->idPool);
    pVgroup->idPool = NULL;
  }
S
slguan 已提交
51 52 53 54
  if (pVgroup->tableList) {
    tfree(pVgroup->tableList);
  }
  tfree(pOper->pObj);
S
slguan 已提交
55 56 57
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
  SVgObj *pVgroup = pOper->pObj;
  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
  if (pDb == NULL) {
    return TSDB_CODE_INVALID_DB;
  }

  pVgroup->pDb = pDb;
  pVgroup->prev = NULL;
  pVgroup->next = NULL;

  int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
  pVgroup->tableList = (STableInfo **)calloc(pDb->cfg.maxSessions, sizeof(STableInfo *));
  if (pVgroup->tableList == NULL) {
    mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
    return -1;
  }

  pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
  if (pVgroup->idPool == NULL) {
    mError("vgroup:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId);
    tfree(pVgroup->tableList);
    return -1;
  }

S
slguan 已提交
83
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
S
slguan 已提交
84 85 86
    SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
    pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
    pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
S
slguan 已提交
87 88 89
    pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
  }

S
slguan 已提交
90 91
  mgmtAddVgroupIntoDb(pVgroup);

S
slguan 已提交
92 93 94
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
95 96 97 98 99
static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
  SVgObj *pVgroup = pOper->pObj;
  
  if (pVgroup->pDb != NULL) {
    mgmtRemoveVgroupFromDb(pVgroup);
S
slguan 已提交
100 101 102 103
  }

  return TSDB_CODE_SUCCESS;
}
S
slguan 已提交
104

S
slguan 已提交
105 106
static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) {
  SVgObj  *pVgroup  = pOper->pObj;
S
slguan 已提交
107 108
  int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);

S
slguan 已提交
109
  SDbObj *pDb = pVgroup->pDb;
S
slguan 已提交
110 111 112 113 114 115 116 117 118
  if (pDb != NULL) {
    if (pDb->cfg.maxSessions != oldTables) {
      mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
      taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
      int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
      pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size);
    }
  }

S
slguan 已提交
119
  mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxSessions, pVgroup->numOfVnodes);
S
slguan 已提交
120
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
121 122
}

S
slguan 已提交
123 124 125
static int32_t mgmtVgroupActionEncode(SSdbOperDesc *pOper) {
  SVgObj *pVgroup = pOper->pObj;
  if (pOper->maxRowSize < tsVgUpdateSize) {
S
slguan 已提交
126 127
    return -1;
  } else {
S
slguan 已提交
128 129 130
    memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
    pOper->rowSize = tsVgUpdateSize;
    return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
131
  }
S
slguan 已提交
132 133
}

S
slguan 已提交
134 135 136
static int32_t mgmtVgroupActionDecode(SSdbOperDesc *pOper) {
  SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
  if (pVgroup == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
S
slguan 已提交
137

S
slguan 已提交
138 139 140
  memcpy(pVgroup, pOper->rowData, tsVgUpdateSize);
  pOper->pObj = pVgroup;
  return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
141 142
}

S
slguan 已提交
143
int32_t mgmtInitVgroups() {
S
slguan 已提交
144 145
  SVgObj tObj;
  tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
H
hzcheng 已提交
146

S
slguan 已提交
147 148 149 150
  SSdbTableDesc tableDesc = {
    .tableName    = "vgroups",
    .hashSessions = TSDB_MAX_VGROUPS,
    .maxRowSize   = tsVgUpdateSize,
S
slguan 已提交
151
    .keyType      = SDB_KEY_TYPE_AUTO,
S
slguan 已提交
152 153 154 155 156 157 158 159 160
    .insertFp     = mgmtVgroupActionInsert,
    .deleteFp     = mgmtVgroupActionDelete,
    .updateFp     = mgmtVgroupActionUpdate,
    .encodeFp     = mgmtVgroupActionEncode,
    .decodeFp     = mgmtVgroupActionDecode,
    .destroyFp    = mgmtVgroupActionDestroy,
  };

  tsVgroupSdb = sdbOpenTable(&tableDesc);
S
slguan 已提交
161
  if (tsVgroupSdb == NULL) {
S
slguan 已提交
162
    mError("failed to init vgroups data");
H
hzcheng 已提交
163 164 165
    return -1;
  }

S
slguan 已提交
166 167
  mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
  mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
168
  mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
169
  mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
S
slguan 已提交
170
  mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
S
slguan 已提交
171

H
hzcheng 已提交
172 173 174 175
  mTrace("vgroup is initialized");
  return 0;
}

S
slguan 已提交
176 177 178
SVgObj *mgmtGetVgroup(int32_t vgId) {
  return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
}
H
hzcheng 已提交
179

S
slguan 已提交
180 181 182
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
  return pDb->pHead;
}
S
slguan 已提交
183

184 185 186
void mgmtCreateVgroup(SQueuedMsg *pMsg) {
  SDbObj *pDb = pMsg->pDb;
  if (pDb == NULL) {
S
slguan 已提交
187
    mError("failed to create vgroup, db not found");
188
    mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
S
slguan 已提交
189
    mgmtFreeQueuedMsg(pMsg);
190 191 192
    return;
  }

S
slguan 已提交
193
  SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
H
hzcheng 已提交
194
  strcpy(pVgroup->dbName, pDb->name);
S
slguan 已提交
195
  pVgroup->numOfVnodes = pDb->cfg.replications;
S
slguan 已提交
196
  pVgroup->createdTime = taosGetTimestampMs();
S
slguan 已提交
197
  if (mgmtAllocVnodes(pVgroup) != 0) {
S
slguan 已提交
198
    mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
H
hzcheng 已提交
199
    free(pVgroup);
200
    mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
S
slguan 已提交
201
    mgmtFreeQueuedMsg(pMsg);
202
    return;
H
hzcheng 已提交
203 204
  }

S
slguan 已提交
205 206 207 208 209 210
  SSdbOperDesc oper = {
    .type = SDB_OPER_TYPE_GLOBAL,
    .table = tsVgroupSdb,
    .pObj = pVgroup,
    .rowSize = sizeof(SVgObj)
  };
S
slguan 已提交
211

S
slguan 已提交
212 213 214 215 216 217 218 219
  int32_t code = sdbInsertRow(&oper);
  if (code != TSDB_CODE_SUCCESS) {
    tfree(pVgroup);
    code = TSDB_CODE_SDB_ERROR;
    mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR);
    mgmtFreeQueuedMsg(pMsg);
    return;
  }
H
hzcheng 已提交
220

S
slguan 已提交
221
  mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
222
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
S
slguan 已提交
223
    mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
224
  }
H
hzcheng 已提交
225

S
slguan 已提交
226 227
  pMsg->ahandle = pVgroup;
  pMsg->expected = pVgroup->numOfVnodes;
228
  mgmtSendCreateVgroupMsg(pVgroup, pMsg);
H
hzcheng 已提交
229 230
}

S
slguan 已提交
231 232 233 234 235 236
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
  if (ahandle != NULL) {
    mgmtSendDropVgroupMsg(pVgroup, ahandle);
  } else {
    mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
    mgmtSendDropVgroupMsg(pVgroup, NULL);
S
slguan 已提交
237 238 239 240 241 242
    SSdbOperDesc oper = {
      .type = SDB_OPER_TYPE_GLOBAL,
      .table = tsVgroupSdb,
      .pObj = pVgroup
    };
    sdbDeleteRow(&oper);
H
hzcheng 已提交
243 244 245
  }
}

S
slguan 已提交
246 247 248
void mgmtCleanUpVgroups() {
  sdbCloseTable(tsVgroupSdb);
}
H
hzcheng 已提交
249

H
hjxilinx 已提交
250
int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
S
slguan 已提交
251 252 253 254 255 256
  SDbObj *pDb = mgmtGetDb(pShow->db);
  if (pDb == NULL) {
    return TSDB_CODE_DB_NOT_SELECTED;
  }

  int32_t cols = 0;
H
hjxilinx 已提交
257
  SSchema *pSchema = pMeta->schema;
S
slguan 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tables");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 9;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
S
slguan 已提交
273
  strcpy(pSchema[cols].name, "vgroup status");
S
slguan 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  int32_t maxReplica = 0;
  SVgObj  *pVgroup   = NULL;
  STableInfo *pTable = NULL;
  if (pShow->payloadLen > 0 ) {
    pTable = mgmtGetTable(pShow->payload);
    if (NULL == pTable) {
      return TSDB_CODE_INVALID_TABLE_ID;
    }

    pVgroup = mgmtGetVgroup(pTable->vgId);
    if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;

    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
  } else {
    SVgObj *pVgroup = pDb->pHead;
    while (pVgroup != NULL) {
      maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
      pVgroup = pVgroup->next;
    }
  }

  for (int32_t i = 0; i < maxReplica; ++i) {
    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "ip");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 2;
    pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
    strcpy(pSchema[cols].name, "vnode");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 9;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
S
slguan 已提交
313
    strcpy(pSchema[cols].name, "vnode status");
S
slguan 已提交
314 315 316 317 318
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
S
slguan 已提交
319
    strcpy(pSchema[cols].name, "public ip");
S
slguan 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;
  }

  pMeta->numOfColumns = htons(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];

  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];

  if (NULL == pTable) {
    pShow->numOfRows = pDb->numOfVgroups;
    pShow->pNode = pDb->pHead;
  } else {
    pShow->numOfRows = 1;
    pShow->pNode = pVgroup;
  }
L
[1361]  
lihui 已提交
339

H
hzcheng 已提交
340 341 342
  return 0;
}

S
slguan 已提交
343
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
344
  SDnodeObj *pDnode = mgmtGetDnode(pVnode->dnodeId);
S
slguan 已提交
345
  if (pDnode == NULL) {
346
    mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId);
S
slguan 已提交
347 348 349 350 351 352 353
    return "null";
  }

  if (pDnode->status == TSDB_DN_STATUS_OFFLINE) {
    return "offline";
  }

354 355
  for (int i = 0; i < pDnode->openVnodes; ++i) {
    if (pDnode->vload[i].vgId == pVgroup->vgId) {
H
hjxilinx 已提交
356
       return (char*)taosGetVnodeStatusStr(pDnode->vload[i].status);
357
    }
S
slguan 已提交
358
  }
359 360
  
  return "null";
S
slguan 已提交
361 362
}

S
slguan 已提交
363
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
S
slguan 已提交
364
  int32_t numOfRows = 0;
S
slguan 已提交
365 366 367 368 369 370 371 372 373 374 375 376 377 378
  SVgObj *pVgroup = NULL;
  int32_t maxReplica = 0;
  int32_t cols = 0;
  char    ipstr[20];
  char *  pWrite;

  SDbObj *pDb = mgmtGetDb(pShow->db);
  if (pDb == NULL) return 0;

  pVgroup = pDb->pHead;
  while (pVgroup != NULL) {
    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
    pVgroup    = pVgroup->next;
  }
H
hzcheng 已提交
379

S
slguan 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
  while (numOfRows < rows) {
    pVgroup = (SVgObj *) pShow->pNode;
    if (pVgroup == NULL) break;
    pShow->pNode = (void *) pVgroup->next;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *) pWrite = pVgroup->vgId;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *) pWrite = pVgroup->numOfTables;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
    cols++;

    for (int32_t i = 0; i < maxReplica; ++i) {
400
      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
S
slguan 已提交
401 402 403 404 405 406 407 408 409
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
410
      if (pVgroup->vnodeGid[i].dnodeId != 0) {
S
slguan 已提交
411 412 413 414 415 416
        char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
        strcpy(pWrite, vnodeStatus);
      } else {
        strcpy(pWrite, "null");
      }
      cols++;
H
hzcheng 已提交
417

S
slguan 已提交
418 419 420 421 422
      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;
    }
H
hzcheng 已提交
423

S
slguan 已提交
424 425
    numOfRows++;
  }
H
hzcheng 已提交
426

S
slguan 已提交
427 428 429 430 431
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
S
slguan 已提交
432
  if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) {
S
slguan 已提交
433
    pVgroup->tableList[pTable->sid] = pTable;
S
slguan 已提交
434 435 436 437 438 439 440 441 442
    taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
    pVgroup->numOfTables++;
    pVgroup->pDb->numOfTables++;
  }

  if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
    mgmtAddVgroupIntoDbTail(pVgroup);
  else
    mgmtAddVgroupIntoDb(pVgroup);
S
slguan 已提交
443 444 445
}

void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
S
slguan 已提交
446
  if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) {
S
slguan 已提交
447
    pVgroup->tableList[pTable->sid] = NULL;
S
slguan 已提交
448 449 450 451 452 453 454 455 456
    taosFreeId(pVgroup->idPool, pTable->sid);
    pVgroup->numOfTables--;
    pVgroup->pDb->numOfTables--;
  }

  if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxSessions)
    mgmtAddVgroupIntoDbTail(pVgroup);
  else
    mgmtAddVgroupIntoDb(pVgroup);
S
slguan 已提交
457
}
S
slguan 已提交
458

S
slguan 已提交
459
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
S
slguan 已提交
460
  SDbObj *pDb = pVgroup->pDb;
S
slguan 已提交
461 462
  if (pDb == NULL) return NULL;

463 464
  SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
  if (pVnode == NULL) return NULL;
S
slguan 已提交
465

S
slguan 已提交
466
  pVnode->cfg = pDb->cfg;
S
slguan 已提交
467

468
  SVnodeCfg *pCfg = &pVnode->cfg;
S
slguan 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481
  pCfg->vgId                         = htonl(pVgroup->vgId);
  pCfg->maxSessions                  = htonl(pCfg->maxSessions);
  pCfg->cacheBlockSize               = htonl(pCfg->cacheBlockSize);
  pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks);
  pCfg->daysPerFile                  = htonl(pCfg->daysPerFile);
  pCfg->daysToKeep1                  = htonl(pCfg->daysToKeep1);
  pCfg->daysToKeep2                  = htonl(pCfg->daysToKeep2);
  pCfg->daysToKeep                   = htonl(pCfg->daysToKeep);
  pCfg->commitTime                   = htonl(pCfg->commitTime);
  pCfg->blocksPerTable               = htons(pCfg->blocksPerTable);
  pCfg->replications                 = (char) pVgroup->numOfVnodes;
  pCfg->rowsInFileBlock              = htonl(pCfg->rowsInFileBlock);

482
  SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
S
slguan 已提交
483
  for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
S
slguan 已提交
484
    vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
485
    vpeerDesc[j].ip    = htonl(pVgroup->vnodeGid[j].privateIp);
S
slguan 已提交
486 487
  }

488
  return pVnode;
S
slguan 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
}

SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
  if (vnode < 0 || vnode >= TSDB_MAX_VNODES) {
    return NULL;
  }

  SDnodeObj *pDnode = mgmtGetDnode(dnode);
  if (pDnode == NULL) {
    return NULL;
  }

  int32_t vgId = pDnode->vload[vnode].vgId;
  return mgmtGetVgroup(vgId);
}

SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
506 507 508
  SRpcIpSet ipSet = {
    .numOfIps = pVgroup->numOfVnodes,
    .inUse = 0,
S
slguan 已提交
509
    .port = tsDnodeMnodePort
510
  };
S
slguan 已提交
511
  for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
512
    ipSet.ip[i] = pVgroup->vnodeGid[i].privateIp;
S
slguan 已提交
513 514 515 516 517
  }
  return ipSet;
}

SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
518 519 520 521
  SRpcIpSet ipSet = {
    .ip[0]    = ip,
    .numOfIps = 1,
    .inUse    = 0,
S
slguan 已提交
522
    .port     = tsDnodeMnodePort
523
  };
S
slguan 已提交
524
  return ipSet;
S
slguan 已提交
525 526
}

S
slguan 已提交
527
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
S
slguan 已提交
528
  mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle);
S
slguan 已提交
529
  SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup);
S
slguan 已提交
530
  SRpcMsg rpcMsg = {
531 532 533 534 535
    .handle  = ahandle,
    .pCont   = pCreate,
    .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
    .code    = 0,
    .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
S
slguan 已提交
536 537 538 539 540
  };
  mgmtSendMsgToDnode(ipSet, &rpcMsg);
}

void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
S
slguan 已提交
541
  mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
S
slguan 已提交
542
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
543
    SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp);
S
slguan 已提交
544
    mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
S
slguan 已提交
545
  }
546 547 548 549 550 551 552 553 554 555 556 557 558
}

static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
  if (rpcMsg->handle == NULL) return;

  SQueuedMsg *queueMsg = rpcMsg->handle;
  queueMsg->received++;
  if (rpcMsg->code == TSDB_CODE_SUCCESS) {
    queueMsg->code = rpcMsg->code;
    queueMsg->successed++;
  }

  SVgObj *pVgroup = queueMsg->ahandle;
S
slguan 已提交
559 560 561
  mTrace("vgroup:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
         pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
         queueMsg->thandle, rpcMsg->handle);
562 563 564 565 566 567 568 569 570 571 572 573

  if (queueMsg->received != queueMsg->expected) return;

  if (queueMsg->received == queueMsg->successed) {
    SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
    newMsg->msgType = queueMsg->msgType;
    newMsg->thandle = queueMsg->thandle;
    newMsg->pDb     = queueMsg->pDb;
    newMsg->pUser   = queueMsg->pUser;
    newMsg->contLen = queueMsg->contLen;
    newMsg->pCont   = rpcMallocCont(newMsg->contLen);
    memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
S
slguan 已提交
574
    queueMsg->pCont = NULL;
575 576
    mgmtAddToShellQueue(newMsg);
  } else {
S
slguan 已提交
577 578 579 580 581 582 583 584 585 586
    SSdbOperDesc oper = {
      .type = SDB_OPER_TYPE_GLOBAL,
      .table = tsVgroupSdb,
      .pObj = pVgroup
    };
    int32_t code = sdbDeleteRow(&oper);
    if (code != 0) {
      code = TSDB_CODE_SDB_ERROR;
    }
    
587 588 589
    mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
  }

S
slguan 已提交
590
  mgmtFreeQueuedMsg(queueMsg);
591 592
}

S
slguan 已提交
593
static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) {
594 595 596
  SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
  if (pDrop == NULL) return NULL;

S
slguan 已提交
597
  pDrop->vgId = htonl(vgId);
598 599 600
  return pDrop;
}

S
slguan 已提交
601 602 603
void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
  mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", vgId, ahandle);
  SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(vgId);
604 605 606 607 608 609 610 611 612 613 614 615 616
  SRpcMsg rpcMsg = {
      .handle  = ahandle,
      .pCont   = pDrop,
      .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
      .code    = 0,
      .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
  };
  mgmtSendMsgToDnode(ipSet, &rpcMsg);
}

static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
  mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
617
    SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp);
S
slguan 已提交
618
    mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle);
619 620 621 622 623
  }
}

static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
  mTrace("drop vnode msg is received");
S
slguan 已提交
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
  if (rpcMsg->handle == NULL) return;

  SQueuedMsg *queueMsg = rpcMsg->handle;
  queueMsg->received++;
  if (rpcMsg->code == TSDB_CODE_SUCCESS) {
    queueMsg->code = rpcMsg->code;
    queueMsg->successed++;
  }

  SVgObj *pVgroup = queueMsg->ahandle;
  mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
         pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
         queueMsg->thandle, rpcMsg->handle);

  if (queueMsg->received != queueMsg->expected) return;

S
slguan 已提交
640 641 642 643 644 645 646 647 648
  SSdbOperDesc oper = {
    .type = SDB_OPER_TYPE_GLOBAL,
    .table = tsVgroupSdb,
    .pObj = pVgroup
  };
  int32_t code = sdbDeleteRow(&oper);
  if (code != 0) {
    code = TSDB_CODE_SDB_ERROR;
  }
S
slguan 已提交
649 650 651 652 653 654 655 656 657 658 659

  SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
  newMsg->msgType = queueMsg->msgType;
  newMsg->thandle = queueMsg->thandle;
  newMsg->pDb     = queueMsg->pDb;
  newMsg->pUser   = queueMsg->pUser;
  newMsg->contLen = queueMsg->contLen;
  newMsg->pCont   = rpcMallocCont(newMsg->contLen);
  memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
  mgmtAddToShellQueue(newMsg);

S
slguan 已提交
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
  queueMsg->pCont = NULL;
  mgmtFreeQueuedMsg(queueMsg);
}

static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
  if (mgmtCheckRedirect(rpcMsg->handle)) return;

  SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont;
  pCfg->dnode = htonl(pCfg->dnode);
  pCfg->vnode = htonl(pCfg->vnode);

  SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
  if (pVgroup == NULL) {
    mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
    mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
    return;
  }

  mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);

  SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
  mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
S
slguan 已提交
682 683
}

S
slguan 已提交
684 685 686 687 688
void mgmtDropAllVgroups(SDbObj *pDropDb) {
  void *pNode = NULL;
  void *pLastNode = NULL;
  int32_t numOfTables = 0;
  int32_t dbNameLen = strlen(pDropDb->name);
S
slguan 已提交
689
  SVgObj *pVgroup = NULL;
S
slguan 已提交
690

S
slguan 已提交
691 692 693 694
  while (1) {
    pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
    if (pVgroup == NULL) break;

S
slguan 已提交
695 696 697 698 699 700 701 702 703 704
    if (strncmp(pDropDb->name, pVgroup->dbName, dbNameLen) == 0) {
      SSdbOperDesc oper = {
        .type = SDB_OPER_TYPE_LOCAL,
        .table = tsVgroupSdb,
        .pObj = pVgroup,
      };
      sdbDeleteRow(&oper);
      pNode = pLastNode;
      numOfTables++;
      continue;
S
slguan 已提交
705 706
    }
  }
S
slguan 已提交
707 708

  mTrace("db:%s, all vgroups is dropped from sdb", pDropDb->name, numOfTables);
S
slguan 已提交
709
}
710

S
slguan 已提交
711 712 713 714 715 716 717 718 719 720
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
  assert(ahandle != NULL);

  if (pVgroup->numOfVnodes != pVgroup->pDb->cfg.replications) {
    // TODO:
    // mgmtSendAlterVgroupMsg(pVgroup, NULL);
  } else {
    mgmtAddToShellQueue(ahandle);
  }
}