mgmtVgroup.c 21.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18 19 20 21
#include "taoserror.h"
#include "tlog.h"
#include "tschemautil.h"
#include "tstatus.h"
S
slguan 已提交
22
#include "mnode.h"
S
slguan 已提交
23 24
#include "mgmtBalance.h"
#include "mgmtDb.h"
S
slguan 已提交
25
#include "mgmtDClient.h"
S
slguan 已提交
26
#include "mgmtDnode.h"
27
#include "mgmtProfile.h"
S
slguan 已提交
28
#include "mgmtShell.h"
S
#1177  
slguan 已提交
29
#include "mgmtTable.h"
S
slguan 已提交
30
#include "mgmtVgroup.h"
H
hzcheng 已提交
31

S
slguan 已提交
32 33 34 35 36 37 38 39 40 41 42 43
static void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0;

static void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize);
static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);

S
slguan 已提交
44 45
static int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
46
static void    mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
47
static void    mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg);
48

49 50
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
static void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
S
slguan 已提交
51

S
slguan 已提交
52 53 54 55 56 57 58 59 60 61
static void mgmtVgroupActionInit() {
  SVgObj tObj;
  tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;

  mgmtVgroupActionFp[SDB_TYPE_INSERT]  = mgmtVgroupActionInsert;
  mgmtVgroupActionFp[SDB_TYPE_DELETE]  = mgmtVgroupActionDelete;
  mgmtVgroupActionFp[SDB_TYPE_UPDATE]  = mgmtVgroupActionUpdate;
  mgmtVgroupActionFp[SDB_TYPE_ENCODE]  = mgmtVgroupActionEncode;
  mgmtVgroupActionFp[SDB_TYPE_DECODE]  = mgmtVgroupActionDecode;
  mgmtVgroupActionFp[SDB_TYPE_RESET]   = mgmtVgroupActionReset;
H
hzcheng 已提交
62 63 64
  mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy;
}

S
slguan 已提交
65 66 67
static void *mgmtVgroupAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
  if (mgmtVgroupActionFp[(uint8_t) action] != NULL) {
    return (*(mgmtVgroupActionFp[(uint8_t) action]))(row, str, size, ssize);
H
hzcheng 已提交
68 69 70 71
  }
  return NULL;
}

S
slguan 已提交
72
int32_t mgmtInitVgroups() {
S
slguan 已提交
73
  void *pNode = NULL;
H
hzcheng 已提交
74 75 76 77
  SVgObj *pVgroup = NULL;

  mgmtVgroupActionInit();

S
slguan 已提交
78
  tsVgroupSdb = sdbOpenTable(tsMaxVGroups, tsVgUpdateSize, "vgroups", SDB_KEYTYPE_AUTO, tsMnodeDir, mgmtVgroupAction);
S
slguan 已提交
79
  if (tsVgroupSdb == NULL) {
S
slguan 已提交
80
    mError("failed to init vgroups data");
H
hzcheng 已提交
81 82 83 84
    return -1;
  }

  while (1) {
S
slguan 已提交
85
    pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
H
hzcheng 已提交
86 87 88 89 90 91 92
    if (pVgroup == NULL) break;

    SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
    if (pDb == NULL) continue;

    pVgroup->prev = NULL;
    pVgroup->next = NULL;
S
slguan 已提交
93

S
slguan 已提交
94 95 96 97
    int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
    pVgroup->tableList = (STableInfo **)malloc(size);
    if (pVgroup->tableList == NULL) {
      mError("failed to malloc(size:%d) for the tableList of vgroups", size);
S
slguan 已提交
98 99 100
      return -1;
    }
    
S
slguan 已提交
101
    memset(pVgroup->tableList, 0, size);
H
hzcheng 已提交
102 103

    pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
S
slguan 已提交
104 105
    if (pVgroup->idPool == NULL) {
      mError("failed to taosInitIdPool for vgroups");
S
slguan 已提交
106
      free(pVgroup->tableList);
S
slguan 已提交
107 108 109
      return -1;
    }
    
H
hzcheng 已提交
110
    taosIdPoolReinit(pVgroup->idPool);
S
slguan 已提交
111

S
slguan 已提交
112
    if (tsIsCluster && pVgroup->vnodeGid[0].publicIp == 0) {
S
slguan 已提交
113 114 115
      pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp);
      pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp);
      sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 1);
S
slguan 已提交
116
    }
H
hzcheng 已提交
117

S
slguan 已提交
118
    mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
H
hzcheng 已提交
119 120
  }

S
slguan 已提交
121 122
  mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
  mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
123
  mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
124
  mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
S
slguan 已提交
125

H
hzcheng 已提交
126 127 128 129
  mTrace("vgroup is initialized");
  return 0;
}

S
slguan 已提交
130 131 132
SVgObj *mgmtGetVgroup(int32_t vgId) {
  return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
}
H
hzcheng 已提交
133

S
slguan 已提交
134 135 136 137 138 139
/*
 * TODO: check if there is enough sids
 */
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
  return pDb->pHead;
}
S
slguan 已提交
140

141 142 143
void mgmtCreateVgroup(SQueuedMsg *pMsg) {
  SDbObj *pDb = pMsg->pDb;
  if (pDb == NULL) {
S
slguan 已提交
144
    mError("failed to create vgroup, db not found");
145 146 147 148
    mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
    return;
  }

S
slguan 已提交
149
  SVgObj *pVgroup = (SVgObj *)calloc(sizeof(SVgObj), 1);
H
hzcheng 已提交
150
  strcpy(pVgroup->dbName, pDb->name);
S
slguan 已提交
151 152
  pVgroup->numOfVnodes = pDb->cfg.replications;
  if (mgmtAllocVnodes(pVgroup) != 0) {
S
slguan 已提交
153
    mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
H
hzcheng 已提交
154
    free(pVgroup);
155 156
    mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
    return;
H
hzcheng 已提交
157 158
  }

159
  pVgroup->createdTime = taosGetTimestampMs();
S
slguan 已提交
160 161 162 163 164 165 166
  pVgroup->tableList   = (STableInfo **) calloc(sizeof(STableInfo *), pDb->cfg.maxSessions);
  pVgroup->numOfTables = 0;
  pVgroup->idPool      = taosInitIdPool(pDb->cfg.maxSessions);

  mgmtAddVgroupIntoDb(pDb, pVgroup);
  mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);

S
slguan 已提交
167
  sdbInsertRow(tsVgroupSdb, pVgroup, 0);
H
hzcheng 已提交
168

S
slguan 已提交
169
  mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
170
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
S
slguan 已提交
171
    mPrint("vgroup:%d, dnode:%s vnode:%d", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
172
  }
H
hzcheng 已提交
173

S
slguan 已提交
174 175
  pMsg->ahandle = pVgroup;
  pMsg->expected = pVgroup->numOfVnodes;
176
  mgmtSendCreateVgroupMsg(pVgroup, pMsg);
H
hzcheng 已提交
177 178
}

S
slguan 已提交
179 180 181 182 183 184 185
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
  if (ahandle != NULL) {
    mgmtSendDropVgroupMsg(pVgroup, ahandle);
  } else {
    mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
    mgmtSendDropVgroupMsg(pVgroup, NULL);
    sdbDeleteRow(tsVgroupSdb, pVgroup);
H
hzcheng 已提交
186 187 188 189 190 191 192 193 194
  }
}

void mgmtSetVgroupIdPool() {
  void *  pNode = NULL;
  SVgObj *pVgroup = NULL;
  SDbObj *pDb;

  while (1) {
S
slguan 已提交
195
    pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
H
hzcheng 已提交
196 197 198
    if (pVgroup == NULL || pVgroup->idPool == 0) break;

    taosIdPoolSetFreeList(pVgroup->idPool);
S
slguan 已提交
199
    pVgroup->numOfTables = taosIdPoolNumOfUsed(pVgroup->idPool);
H
hzcheng 已提交
200 201

    pDb = mgmtGetDb(pVgroup->dbName);
S
slguan 已提交
202 203
    pDb->numOfTables += pVgroup->numOfTables;
    if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1)
H
hzcheng 已提交
204 205 206 207 208 209
      mgmtAddVgroupIntoDbTail(pDb, pVgroup);
    else
      mgmtAddVgroupIntoDb(pDb, pVgroup);
  }
}

S
slguan 已提交
210 211 212
void mgmtCleanUpVgroups() {
  sdbCloseTable(tsVgroupSdb);
}
H
hzcheng 已提交
213

S
slguan 已提交
214
int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
S
slguan 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
  SDbObj *pDb = mgmtGetDb(pShow->db);
  if (pDb == NULL) {
    return TSDB_CODE_DB_NOT_SELECTED;
  }

  int32_t cols = 0;
  SSchema *pSchema = tsGetSchema(pMeta);

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tables");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 9;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "vgroup status");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  int32_t maxReplica = 0;
  SVgObj  *pVgroup   = NULL;
  STableInfo *pTable = NULL;
  if (pShow->payloadLen > 0 ) {
    pTable = mgmtGetTable(pShow->payload);
    if (NULL == pTable) {
      return TSDB_CODE_INVALID_TABLE_ID;
    }

    pVgroup = mgmtGetVgroup(pTable->vgId);
    if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;

    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
  } else {
    SVgObj *pVgroup = pDb->pHead;
    while (pVgroup != NULL) {
      maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
      pVgroup = pVgroup->next;
    }
  }

  for (int32_t i = 0; i < maxReplica; ++i) {
    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "ip");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 2;
    pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
    strcpy(pSchema[cols].name, "vnode");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 9;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "vnode status");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "public ip");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;
  }

  pMeta->numOfColumns = htons(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];

  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];

  if (NULL == pTable) {
    pShow->numOfRows = pDb->numOfVgroups;
    pShow->pNode = pDb->pHead;
  } else {
    pShow->numOfRows = 1;
    pShow->pNode = pVgroup;
  }
L
[1361]  
lihui 已提交
303

H
hzcheng 已提交
304 305 306
  return 0;
}

S
slguan 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
  SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip);
  if (pDnode == NULL) {
    mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode);
    return "null";
  }

  if (pDnode->status == TSDB_DN_STATUS_OFFLINE) {
    return "offline";
  }

  SVnodeLoad *vload = pDnode->vload + pVnode->vnode;
  if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) {
    mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d",
           taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode);
    return "null";
  }

  return (char*)taosGetVnodeStatusStr(vload->status);
}

S
slguan 已提交
328
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
S
slguan 已提交
329
  int32_t numOfRows = 0;
S
slguan 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343
  SVgObj *pVgroup = NULL;
  int32_t maxReplica = 0;
  int32_t cols = 0;
  char    ipstr[20];
  char *  pWrite;

  SDbObj *pDb = mgmtGetDb(pShow->db);
  if (pDb == NULL) return 0;

  pVgroup = pDb->pHead;
  while (pVgroup != NULL) {
    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
    pVgroup    = pVgroup->next;
  }
H
hzcheng 已提交
344

S
slguan 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
  while (numOfRows < rows) {
    pVgroup = (SVgObj *) pShow->pNode;
    if (pVgroup == NULL) break;
    pShow->pNode = (void *) pVgroup->next;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *) pWrite = pVgroup->vgId;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *) pWrite = pVgroup->numOfTables;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
    cols++;

    for (int32_t i = 0; i < maxReplica; ++i) {
      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      if (pVgroup->vnodeGid[i].ip != 0) {
        char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
        strcpy(pWrite, vnodeStatus);
      } else {
        strcpy(pWrite, "null");
      }
      cols++;
H
hzcheng 已提交
382

S
slguan 已提交
383 384 385 386 387
      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;
    }
H
hzcheng 已提交
388

S
slguan 已提交
389 390
    numOfRows++;
  }
H
hzcheng 已提交
391

S
slguan 已提交
392 393 394 395 396
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
H
hzcheng 已提交
397 398 399
  return NULL;
}

S
slguan 已提交
400 401
static void *mgmtVgroupActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
  SVgObj *pVgroup = row;
H
hzcheng 已提交
402 403
  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);

S
slguan 已提交
404 405 406 407
  if (pDb != NULL) {
    mgmtRemoveVgroupFromDb(pDb, pVgroup);
  }

S
slguan 已提交
408
  mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
S
slguan 已提交
409
  tfree(pVgroup->tableList);
H
hzcheng 已提交
410 411 412 413

  return NULL;
}

S
slguan 已提交
414
static void *mgmtVgroupActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
H
hzcheng 已提交
415
  mgmtVgroupActionReset(row, str, size, ssize);
S
slguan 已提交
416 417

  SVgObj  *pVgroup  = (SVgObj *) row;
S
slguan 已提交
418
  int32_t oldTables = taosIdPoolMaxSize(pVgroup->idPool);
S
slguan 已提交
419 420 421 422 423 424

  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
  if (pDb != NULL) {
    if (pDb->cfg.maxSessions != oldTables) {
      mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
      taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
S
slguan 已提交
425 426
      int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
      pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size);
S
slguan 已提交
427 428
    }
  }
H
hzcheng 已提交
429 430 431 432 433

  mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);

  return NULL;
}
S
slguan 已提交
434

S
slguan 已提交
435 436 437
static void *mgmtVgroupActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
  SVgObj *pVgroup = (SVgObj *) row;
  if (size < tsVgUpdateSize) {
H
hzcheng 已提交
438 439
    *ssize = -1;
  } else {
S
slguan 已提交
440 441
    memcpy(str, pVgroup, tsVgUpdateSize);
    *ssize = tsVgUpdateSize;
H
hzcheng 已提交
442 443 444 445
  }

  return NULL;
}
S
slguan 已提交
446

S
slguan 已提交
447 448
static void *mgmtVgroupActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
  SVgObj *pVgroup = (SVgObj *) malloc(sizeof(SVgObj));
H
hzcheng 已提交
449 450 451
  if (pVgroup == NULL) return NULL;
  memset(pVgroup, 0, sizeof(SVgObj));

S
slguan 已提交
452 453
  int32_t tsVgUpdateSize = pVgroup->updateEnd - (int8_t *) pVgroup;
  memcpy(pVgroup, str, tsVgUpdateSize);
H
hzcheng 已提交
454

S
slguan 已提交
455
  return (void *) pVgroup;
H
hzcheng 已提交
456
}
S
slguan 已提交
457

S
slguan 已提交
458 459 460
static void *mgmtVgroupActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
  SVgObj *pVgroup = (SVgObj *) row;
  memcpy(pVgroup, str, tsVgUpdateSize);
H
hzcheng 已提交
461 462
  return NULL;
}
S
slguan 已提交
463

S
slguan 已提交
464 465
static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
  SVgObj *pVgroup = (SVgObj *) row;
H
hzcheng 已提交
466 467 468 469
  if (pVgroup->idPool) {
    taosIdPoolCleanUp(pVgroup->idPool);
    pVgroup->idPool = NULL;
  }
S
slguan 已提交
470
  if (pVgroup->tableList) tfree(pVgroup->tableList);
H
hzcheng 已提交
471 472
  tfree(row);
  return NULL;
S
slguan 已提交
473
}
S
slguan 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490

void mgmtUpdateVgroup(SVgObj *pVgroup) {
  sdbUpdateRow(tsVgroupSdb, pVgroup, tsVgUpdateSize, 0);
}

void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
  pVgroup->numOfTables++;
  if (pTable->sid >= 0)
    pVgroup->tableList[pTable->sid] = pTable;
}

void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) {
  pVgroup->numOfTables--;
  if (pTable->sid >= 0)
    pVgroup->tableList[pTable->sid] = NULL;
  taosFreeId(pVgroup->idPool, pTable->sid);
}
S
slguan 已提交
491

S
slguan 已提交
492
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
S
slguan 已提交
493 494 495
  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
  if (pDb == NULL) return NULL;

496 497
  SMDCreateVnodeMsg *pVnode = rpcMallocCont(sizeof(SMDCreateVnodeMsg));
  if (pVnode == NULL) return NULL;
S
slguan 已提交
498

499 500
  pVnode->vnode = htonl(vnode);
  pVnode->cfg   = pDb->cfg;
S
slguan 已提交
501

502
  SVnodeCfg *pCfg = &pVnode->cfg;
S
slguan 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515
  pCfg->vgId                         = htonl(pVgroup->vgId);
  pCfg->maxSessions                  = htonl(pCfg->maxSessions);
  pCfg->cacheBlockSize               = htonl(pCfg->cacheBlockSize);
  pCfg->cacheNumOfBlocks.totalBlocks = htonl(pCfg->cacheNumOfBlocks.totalBlocks);
  pCfg->daysPerFile                  = htonl(pCfg->daysPerFile);
  pCfg->daysToKeep1                  = htonl(pCfg->daysToKeep1);
  pCfg->daysToKeep2                  = htonl(pCfg->daysToKeep2);
  pCfg->daysToKeep                   = htonl(pCfg->daysToKeep);
  pCfg->commitTime                   = htonl(pCfg->commitTime);
  pCfg->blocksPerTable               = htons(pCfg->blocksPerTable);
  pCfg->replications                 = (char) pVgroup->numOfVnodes;
  pCfg->rowsInFileBlock              = htonl(pCfg->rowsInFileBlock);

516
  SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
S
slguan 已提交
517
  for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
518 519
    vpeerDesc[j].vgId  = htonl(pVgroup->vgId);
    vpeerDesc[j].ip    = htonl(pVgroup->vnodeGid[j].ip);
S
slguan 已提交
520 521 522
    vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
  }

523
  return pVnode;
S
slguan 已提交
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
}

SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
  if (vnode < 0 || vnode >= TSDB_MAX_VNODES) {
    return NULL;
  }

  SDnodeObj *pDnode = mgmtGetDnode(dnode);
  if (pDnode == NULL) {
    return NULL;
  }

  int32_t vgId = pDnode->vload[vnode].vgId;
  return mgmtGetVgroup(vgId);
}

SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
541 542 543
  SRpcIpSet ipSet = {
    .numOfIps = pVgroup->numOfVnodes,
    .inUse = 0,
S
slguan 已提交
544
    .port = tsDnodeMnodePort
545
  };
S
slguan 已提交
546 547 548 549 550 551 552
  for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
    ipSet.ip[i] = pVgroup->vnodeGid[i].ip;
  }
  return ipSet;
}

SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
553 554 555 556
  SRpcIpSet ipSet = {
    .ip[0]    = ip,
    .numOfIps = 1,
    .inUse    = 0,
S
slguan 已提交
557
    .port     = tsDnodeMnodePort
558
  };
S
slguan 已提交
559
  return ipSet;
S
slguan 已提交
560 561 562 563 564 565
}

void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
  mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
  SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
  SRpcMsg rpcMsg = {
566 567 568 569 570
    .handle  = ahandle,
    .pCont   = pCreate,
    .contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
    .code    = 0,
    .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
S
slguan 已提交
571 572 573 574 575
  };
  mgmtSendMsgToDnode(ipSet, &rpcMsg);
}

void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
S
slguan 已提交
576
  mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
S
slguan 已提交
577 578 579 580
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
    SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
    mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
  }
581 582 583 584 585 586 587 588 589 590 591 592 593
}

static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
  if (rpcMsg->handle == NULL) return;

  SQueuedMsg *queueMsg = rpcMsg->handle;
  queueMsg->received++;
  if (rpcMsg->code == TSDB_CODE_SUCCESS) {
    queueMsg->code = rpcMsg->code;
    queueMsg->successed++;
  }

  SVgObj *pVgroup = queueMsg->ahandle;
S
slguan 已提交
594 595 596
  mTrace("vgroup:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
         pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
         queueMsg->thandle, rpcMsg->handle);
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615

  if (queueMsg->received != queueMsg->expected) return;

  if (queueMsg->received == queueMsg->successed) {
    SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
    newMsg->msgType = queueMsg->msgType;
    newMsg->thandle = queueMsg->thandle;
    newMsg->pDb     = queueMsg->pDb;
    newMsg->pUser   = queueMsg->pUser;
    newMsg->contLen = queueMsg->contLen;
    newMsg->pCont   = rpcMallocCont(newMsg->contLen);
    memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
    mgmtAddToShellQueue(newMsg);
  } else {
    sdbDeleteRow(tsVgroupSdb, pVgroup);
    mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code);
  }

  free(queueMsg);
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
}

static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(SVgObj *pVgroup) {
  SMDDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SMDDropVnodeMsg));
  if (pDrop == NULL) return NULL;

  pDrop->vgId = htonl(pVgroup->vgId);
  return pDrop;
}

static void mgmtSendDropVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
  mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", pVgroup->vgId, ahandle);
  SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(pVgroup);
  SRpcMsg rpcMsg = {
      .handle  = ahandle,
      .pCont   = pDrop,
      .contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
      .code    = 0,
      .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
  };
  mgmtSendMsgToDnode(ipSet, &rpcMsg);
}

static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
  mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
  for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
    SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
    mgmtSendDropVnodeMsg(pVgroup, &ipSet, ahandle);
  }
}

static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
  mTrace("drop vnode msg is received");
S
slguan 已提交
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
  if (rpcMsg->handle == NULL) return;

  SQueuedMsg *queueMsg = rpcMsg->handle;
  queueMsg->received++;
  if (rpcMsg->code == TSDB_CODE_SUCCESS) {
    queueMsg->code = rpcMsg->code;
    queueMsg->successed++;
  }

  SVgObj *pVgroup = queueMsg->ahandle;
  mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
         pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
         queueMsg->thandle, rpcMsg->handle);

  if (queueMsg->received != queueMsg->expected) return;

  sdbDeleteRow(tsVgroupSdb, pVgroup);

  SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
  newMsg->msgType = queueMsg->msgType;
  newMsg->thandle = queueMsg->thandle;
  newMsg->pDb     = queueMsg->pDb;
  newMsg->pUser   = queueMsg->pUser;
  newMsg->contLen = queueMsg->contLen;
  newMsg->pCont   = rpcMallocCont(newMsg->contLen);
  memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen);
  mgmtAddToShellQueue(newMsg);

  free(queueMsg);
S
slguan 已提交
678
}