mgmtVgroup.c 14.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
H
hzcheng 已提交
18

S
slguan 已提交
19
#include "mnode.h"
H
hzcheng 已提交
20
#include "tschemautil.h"
S
slguan 已提交
21
#include "tlog.h"
H
hjxilinx 已提交
22
#include "vnodeStatus.h"
H
hzcheng 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

void *       vgSdb = NULL;
int          tsVgUpdateSize;
extern void *dbSdb;
extern void *acctSdb;
extern void *userSdb;
extern void *dnodeSdb;

void *(*mgmtVgroupActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize);
void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize);
S
slguan 已提交
42
bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode);
S
#949  
slguan 已提交
43
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode);
H
hzcheng 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58

void mgmtVgroupActionInit() {
  mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert;
  mgmtVgroupActionFp[SDB_TYPE_DELETE] = mgmtVgroupActionDelete;
  mgmtVgroupActionFp[SDB_TYPE_UPDATE] = mgmtVgroupActionUpdate;
  mgmtVgroupActionFp[SDB_TYPE_ENCODE] = mgmtVgroupActionEncode;
  mgmtVgroupActionFp[SDB_TYPE_DECODE] = mgmtVgroupActionDecode;
  mgmtVgroupActionFp[SDB_TYPE_BEFORE_BATCH_UPDATE] = mgmtVgroupActionBeforeBatchUpdate;
  mgmtVgroupActionFp[SDB_TYPE_BATCH_UPDATE] = mgmtVgroupActionBatchUpdate;
  mgmtVgroupActionFp[SDB_TYPE_AFTER_BATCH_UPDATE] = mgmtVgroupActionAfterBatchUpdate;
  mgmtVgroupActionFp[SDB_TYPE_RESET] = mgmtVgroupActionReset;
  mgmtVgroupActionFp[SDB_TYPE_DESTROY] = mgmtVgroupActionDestroy;
}

void *mgmtVgroupAction(char action, void *row, char *str, int size, int *ssize) {
S
slguan 已提交
59 60
  if (mgmtVgroupActionFp[(uint8_t)action] != NULL) {
    return (*(mgmtVgroupActionFp[(uint8_t)action]))(row, str, size, ssize);
H
hzcheng 已提交
61 62 63 64 65 66 67 68 69 70
  }
  return NULL;
}

int mgmtInitVgroups() {
  void *  pNode = NULL;
  SVgObj *pVgroup = NULL;

  mgmtVgroupActionInit();

S
slguan 已提交
71 72 73
  SVgObj tObj;
  tsVgUpdateSize = tObj.updateEnd - (char *)&tObj;

H
hzcheng 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, mgmtDirectory, mgmtVgroupAction);
  if (vgSdb == NULL) {
    mError("failed to init vgroup data");
    return -1;
  }

  while (1) {
    pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup);
    if (pVgroup == NULL) break;

    SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
    if (pDb == NULL) continue;

    pVgroup->prev = NULL;
    pVgroup->next = NULL;
    int size = sizeof(STabObj *) * pDb->cfg.maxSessions;
    pVgroup->meterList = (STabObj **)malloc(size);
S
slguan 已提交
91 92 93 94 95
    if (pVgroup->meterList == NULL) {
      mError("failed to malloc(size:%d) for the meterList of vgroups", size);
      return -1;
    }
    
H
hzcheng 已提交
96 97 98
    memset(pVgroup->meterList, 0, size);

    pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
S
slguan 已提交
99 100 101 102 103 104
    if (pVgroup->idPool == NULL) {
      mError("failed to taosInitIdPool for vgroups");
      free(pVgroup->meterList);
      return -1;
    }
    
H
hzcheng 已提交
105
    taosIdPoolReinit(pVgroup->idPool);
S
slguan 已提交
106 107 108 109 110 111 112 113 114 115

    if (tsIsCluster) {
      /*
       * Upgrade from open source version to cluster version for the first time
       */
      if (pVgroup->vnodeGid[0].publicIp == 0) {
        pVgroup->vnodeGid[0].publicIp = inet_addr(tsPublicIp);
        pVgroup->vnodeGid[0].ip = inet_addr(tsPrivateIp);
        sdbUpdateRow(vgSdb, pVgroup, tsVgUpdateSize, 1);
      }
S
slguan 已提交
116
    }
H
hzcheng 已提交
117

S
slguan 已提交
118
    mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
H
hzcheng 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131
  }

  mTrace("vgroup is initialized");
  return 0;
}

SVgObj *mgmtGetVgroup(int vgId) { return (SVgObj *)sdbGetRow(vgSdb, &vgId); }

void mgmtProcessVgTimer(void *handle, void *tmrId) {
  SDbObj *pDb = (SDbObj *)handle;
  if (pDb == NULL) return;

  if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) {
S
slguan 已提交
132
    mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus);
H
hzcheng 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
    pDb->vgStatus = TSDB_VG_STATUS_READY;
  }

  pDb->vgTimer = NULL;
}

SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
  SVgObj *pVgroup;
  int     size;

  size = sizeof(SVgObj);
  pVgroup = (SVgObj *)malloc(size);
  memset(pVgroup, 0, size);
  strcpy(pVgroup->dbName, pDb->name);
S
slguan 已提交
147
  pVgroup->numOfVnodes = pDb->cfg.replications;
H
hzcheng 已提交
148 149
  pVgroup->createdTime = taosGetTimestampMs();

S
slguan 已提交
150 151
  // based on load balance, create a new one
  if (mgmtAllocVnodes(pVgroup) != 0) {
S
slguan 已提交
152
    mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes);
H
hzcheng 已提交
153 154 155 156 157 158 159 160
    free(pVgroup);
    pDb->vgStatus = TSDB_VG_STATUS_FULL;
    taosTmrReset(mgmtProcessVgTimer, 5000, pDb, mgmtTmr, &pDb->vgTimer);
    return NULL;
  }

  sdbInsertRow(vgSdb, pVgroup, 0);

S
slguan 已提交
161
  mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
S
slguan 已提交
162
  for (int i = 0; i < pVgroup->numOfVnodes; ++i)
S
slguan 已提交
163
    mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
H
hzcheng 已提交
164

S
slguan 已提交
165
  mgmtSendVPeersMsg(pVgroup);
H
hzcheng 已提交
166 167 168 169 170

  return pVgroup;
}

int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
S
slguan 已提交
171
  STabObj *pTable;
H
hzcheng 已提交
172 173 174 175

  if (pVgroup->numOfMeters > 0) {
    for (int i = 0; i < pDb->cfg.maxSessions; ++i) {
      if (pVgroup->meterList != NULL) {
S
slguan 已提交
176 177
        pTable = pVgroup->meterList[i];
        if (pTable) mgmtDropMeter(pDb, pTable->meterId, 0);
H
hzcheng 已提交
178 179 180 181
      }
    }
  }

S
slguan 已提交
182 183
  mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
  mgmtSendFreeVnodeMsg(pVgroup);
H
hzcheng 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  sdbDeleteRow(vgSdb, pVgroup);

  return 0;
}

void mgmtSetVgroupIdPool() {
  void *  pNode = NULL;
  SVgObj *pVgroup = NULL;
  SDbObj *pDb;

  while (1) {
    pNode = sdbFetchRow(vgSdb, pNode, (void **)&pVgroup);
    if (pVgroup == NULL || pVgroup->idPool == 0) break;

    taosIdPoolSetFreeList(pVgroup->idPool);
    pVgroup->numOfMeters = taosIdPoolNumOfUsed(pVgroup->idPool);

    pDb = mgmtGetDb(pVgroup->dbName);
    pDb->numOfTables += pVgroup->numOfMeters;
    if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1)
      mgmtAddVgroupIntoDbTail(pDb, pVgroup);
    else
      mgmtAddVgroupIntoDb(pDb, pVgroup);
  }
}

void mgmtCleanUpVgroups() { sdbCloseTable(vgSdb); }

int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
  int cols = 0;

S
slguan 已提交
215 216 217 218
  SDbObj *pDb = NULL;
  if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);

  if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
H
hzcheng 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

  SSchema *pSchema = tsGetSchema(pMeta);

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "meters");
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

S
slguan 已提交
234 235 236
  pShow->bytes[cols] = 9;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "vgroup status");
H
hzcheng 已提交
237 238 239
  pSchema[cols].bytes = htons(pShow->bytes[cols]);
  cols++;

L
[1361]  
lihui 已提交
240 241
  int      maxReplica = 0;
  SVgObj  *pVgroup    = NULL;
S
slguan 已提交
242
  STabObj *pTable     = NULL;
L
[1361]  
lihui 已提交
243
  if (pShow->payloadLen > 0 ) {
S
slguan 已提交
244 245
    pTable = mgmtGetTable(pShow->payload);
    if (NULL == pTable) {
L
[1361]  
lihui 已提交
246 247 248
      return TSDB_CODE_INVALID_METER_ID;
    }

S
slguan 已提交
249
    pVgroup = mgmtGetVgroup(pTable->gid.vgId);
L
[1361]  
lihui 已提交
250 251
    if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID;
    
S
slguan 已提交
252
    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
L
[1361]  
lihui 已提交
253 254 255 256 257 258
  } else {
    SVgObj *pVgroup = pDb->pHead;
    while (pVgroup != NULL) {
      maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
      pVgroup = pVgroup->next;
    }
S
slguan 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
  }

  for (int i = 0; i < maxReplica; ++i) {
    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "ip");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 2;
    pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
    strcpy(pSchema[cols].name, "vnode");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 9;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "vnode status");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;

    pShow->bytes[cols] = 16;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    strcpy(pSchema[cols].name, "public ip");
    pSchema[cols].bytes = htons(pShow->bytes[cols]);
    cols++;
  }

H
hzcheng 已提交
287 288 289 290 291 292 293 294
  pMeta->numOfColumns = htons(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];

  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];

S
slguan 已提交
295
  if (NULL == pTable) {
L
[1361]  
lihui 已提交
296 297 298 299 300 301 302
    pShow->numOfRows = pDb->numOfVgroups;
    pShow->pNode = pDb->pHead;
  } else {
    pShow->numOfRows = 1;
    pShow->pNode = pVgroup;
  }

H
hzcheng 已提交
303 304 305 306 307 308 309 310 311 312
  return 0;
}

int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
  int     numOfRows = 0;
  SVgObj *pVgroup = NULL;
  char *  pWrite;
  int     cols = 0;
  char    ipstr[20];

S
slguan 已提交
313
  int maxReplica = 0;
S
slguan 已提交
314 315 316

  SDbObj *pDb = NULL;
  if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
F
fang 已提交
317
  assert(pDb != NULL);
S
slguan 已提交
318 319

  pVgroup = pDb->pHead;
S
slguan 已提交
320 321 322 323 324
  while (pVgroup != NULL) {
    maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
    pVgroup = pVgroup->next;
  }

H
hzcheng 已提交
325
  while (numOfRows < rows) {
S
slguan 已提交
326
    //    pShow->pNode = sdbFetchRow(vgSdb, pShow->pNode, (void **)&pVgroup);
H
hzcheng 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
    pVgroup = (SVgObj *)pShow->pNode;
    if (pVgroup == NULL) break;
    pShow->pNode = (void *)pVgroup->next;

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->vgId;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->numOfMeters;
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
slguan 已提交
342
    strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
H
hzcheng 已提交
343 344
    cols++;

S
slguan 已提交
345 346 347 348 349 350 351 352 353 354 355 356
    for (int i = 0; i < maxReplica; ++i) {
      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(int16_t *)pWrite = pVgroup->vnodeGid[i].vnode;
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      if (pVgroup->vnodeGid[i].ip != 0) {
S
#949  
slguan 已提交
357 358
        char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
        strcpy(pWrite, vnodeStatus);
S
slguan 已提交
359 360 361 362 363 364 365 366 367 368 369
      } else {
        strcpy(pWrite, "null");
      }
      cols++;

      tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      strcpy(pWrite, ipstr);
      cols++;
    }

H
hzcheng 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
    numOfRows++;
  }

  pShow->numOfReads += numOfRows;
  return numOfRows;
}

void *mgmtVgroupActionInsert(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)row;
  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);

  if (pDb == NULL) return NULL;

  int tsize = sizeof(STabObj *) * pDb->cfg.maxSessions;
  pVgroup->meterList = (STabObj **)malloc(tsize);
  memset(pVgroup->meterList, 0, tsize);
  pVgroup->numOfMeters = 0;
  pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
  mgmtAddVgroupIntoDb(pDb, pVgroup);
S
slguan 已提交
389
  mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
H
hzcheng 已提交
390 391 392 393 394 395 396 397 398

  return NULL;
}

void *mgmtVgroupActionDelete(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)row;
  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);

  if (pDb != NULL) mgmtRemoveVgroupFromDb(pDb, pVgroup);
S
slguan 已提交
399
  mgmtUnSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes);
H
hzcheng 已提交
400 401 402 403 404 405 406 407
  tfree(pVgroup->meterList);

  return NULL;
}

void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) {
  mgmtVgroupActionReset(row, str, size, ssize);
  SVgObj *pVgroup = (SVgObj *)row;
S
slguan 已提交
408 409 410 411 412 413 414 415 416 417 418
  int oldTables = taosIdPoolMaxSize(pVgroup->idPool);

  SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
  if (pDb != NULL) {
    if (pDb->cfg.maxSessions != oldTables) {
      mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
      taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
      int size = sizeof(STabObj *) * pDb->cfg.maxSessions;
      pVgroup->meterList = (STabObj **)realloc(pVgroup->meterList, size);
    }
  }
H
hzcheng 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465

  mTrace("vgroup:%d update, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);

  return NULL;
}
void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)row;
  int     tsize = pVgroup->updateEnd - (char *)pVgroup;
  if (size < tsize) {
    *ssize = -1;
  } else {
    memcpy(str, pVgroup, tsize);
    *ssize = tsize;
  }

  return NULL;
}
void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)malloc(sizeof(SVgObj));
  if (pVgroup == NULL) return NULL;
  memset(pVgroup, 0, sizeof(SVgObj));

  int tsize = pVgroup->updateEnd - (char *)pVgroup;
  memcpy(pVgroup, str, tsize);

  return (void *)pVgroup;
}
void *mgmtVgroupActionBeforeBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)row;
  int     tsize = pVgroup->updateEnd - (char *)pVgroup;

  memcpy(pVgroup, str, tsize);

  return NULL;
}
void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize) {
  SVgObj *pVgroup = (SVgObj *)row;
  if (pVgroup->idPool) {
    taosIdPoolCleanUp(pVgroup->idPool);
    pVgroup->idPool = NULL;
  }
  if (pVgroup->meterList) tfree(pVgroup->meterList);
  tfree(row);
  return NULL;
S
slguan 已提交
466
}