mndVgroup.c 23.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndVgroup.h"
S
Shengliang Guan 已提交
18
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
S
Shengliang Guan 已提交
21 22
#include "mndShow.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
23

L
Liu Jicong 已提交
24
#define TSDB_VGROUP_VER_NUMBER   1
S
Shengliang Guan 已提交
25 26
#define TSDB_VGROUP_RESERVE_SIZE 64

S
Shengliang Guan 已提交
27 28 29
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t  mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t  mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
S
Shengliang Guan 已提交
30
static int32_t  mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39
static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);

static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
40
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
41 42
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
43 44 45
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);

int32_t mndInitVgroup(SMnode *pMnode) {
S
Shengliang Guan 已提交
46
  SSdbTable table = {.sdbType = SDB_VGROUP,
47
                     .keyType = SDB_KEY_INT32,
S
Shengliang Guan 已提交
48 49 50 51 52 53
                     .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
                     .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
                     .insertFp = (SdbInsertFp)mndVgroupActionInsert,
                     .updateFp = (SdbUpdateFp)mndVgroupActionDelete,
                     .deleteFp = (SdbDeleteFp)mndVgroupActionUpdate};

H
Hongze Cheng 已提交
54 55 56 57 58
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_SYNC_VNODE_RSP, mndProcessSyncVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
S
Shengliang Guan 已提交
59 60 61 62

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndGetVgroupMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
S
Shengliang Guan 已提交
63 64 65 66
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);

S
Shengliang Guan 已提交
67
  return sdbSetTable(pMnode->pSdb, table);
S
Shengliang Guan 已提交
68 69 70 71
}

void mndCleanupVgroup(SMnode *pMnode) {}

S
Shengliang Guan 已提交
72
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
73 74
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
75
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
76
  if (pRaw == NULL) goto VG_ENCODE_OVER;
S
Shengliang Guan 已提交
77 78

  int32_t dataPos = 0;
79 80 81 82 83 84 85 86 87
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER)
S
Shengliang Guan 已提交
88 89
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
90 91 92 93 94 95 96 97 98 99 100 101
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER)
  }
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER)

  terrno = 0;

VG_ENCODE_OVER:
  if (terrno != 0) {
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
102 103
  }

104
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
105 106 107
  return pRaw;
}

S
Shengliang Guan 已提交
108
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
109 110
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
111
  int8_t sver = 0;
112
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  if (sver != TSDB_VGROUP_VER_NUMBER) {
S
Shengliang Guan 已提交
115
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
116
    goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
117 118
  }

S
Shengliang Guan 已提交
119
  SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
120 121 122 123
  if (pRow == NULL) goto VG_DECODE_OVER;

  SVgObj *pVgroup = sdbGetRowObj(pRow);
  if (pVgroup == NULL) goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
124 125

  int32_t dataPos = 0;
126 127 128 129 130 131 132 133 134
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER)
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER)
S
Shengliang Guan 已提交
135 136
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
137
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER)
S
Shengliang Guan 已提交
138 139 140
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    }
S
Shengliang Guan 已提交
141
  }
142 143 144 145 146 147 148 149 150 151
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER)

  terrno = 0;

VG_DECODE_OVER:
  if (terrno != 0) {
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }
S
Shengliang Guan 已提交
152

153
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
154 155 156 157
  return pRow;
}

static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
158
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
159 160 161 162
  return 0;
}

static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
163
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
164 165 166
  return 0;
}

S
Shengliang Guan 已提交
167 168 169 170 171 172 173 174
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
  pOld->updateTime = pNew->updateTime;
  pOld->version = pNew->version;
  pOld->hashBegin = pNew->hashBegin;
  pOld->hashEnd = pNew->hashEnd;
  pOld->replica = pNew->replica;
  memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
S
Shengliang Guan 已提交
175 176 177 178
  return 0;
}

SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
S
Shengliang Guan 已提交
179 180
  SSdb   *pSdb = pMnode->pSdb;
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
S
Shengliang Guan 已提交
181
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
182 183 184
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
  }
  return pVgroup;
S
Shengliang Guan 已提交
185 186 187 188 189 190 191
}

void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pVgroup);
}

S
Shengliang Guan 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
  SCreateVnodeReq createReq = {0};
  createReq.vgId = pVgroup->vgId;
  createReq.dnodeId = pDnode->id;
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  createReq.dbUid = pDb->uid;
  createReq.vgVersion = pVgroup->version;
  createReq.cacheBlockSize = pDb->cfg.cacheBlockSize;
  createReq.totalBlocks = pDb->cfg.totalBlocks;
  createReq.daysPerFile = pDb->cfg.daysPerFile;
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
  createReq.minRows = pDb->cfg.minRows;
  createReq.maxRows = pDb->cfg.maxRows;
  createReq.commitTime = pDb->cfg.commitTime;
  createReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
  createReq.walLevel = pDb->cfg.walLevel;
  createReq.precision = pDb->cfg.precision;
  createReq.compression = pDb->cfg.compression;
  createReq.quorum = pDb->cfg.quorum;
  createReq.update = pDb->cfg.update;
  createReq.cacheLastRow = pDb->cfg.cacheLastRow;
  createReq.replica = pVgroup->replica;
  createReq.selfIndex = -1;
L
Liu Jicong 已提交
217
  createReq.streamMode = pVgroup->streamMode;
D
dapan1121 已提交
218 219 220
  createReq.hashBegin = pVgroup->hashBegin;
  createReq.hashEnd = pVgroup->hashEnd;
  createReq.hashMethod = pDb->hashMethod;
S
Shengliang Guan 已提交
221 222

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
S
Shengliang Guan 已提交
223
    SReplica  *pReplica = &createReq.replicas[v];
S
Shengliang Guan 已提交
224 225 226 227 228 229
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
    if (pVgidDnode == NULL) {
      return NULL;
    }

S
Shengliang Guan 已提交
230 231
    pReplica->id = pVgidDnode->id;
    pReplica->port = pVgidDnode->port;
S
Shengliang Guan 已提交
232 233 234 235
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
    mndReleaseDnode(pMnode, pVgidDnode);

    if (pDnode->id == pVgid->dnodeId) {
S
Shengliang Guan 已提交
236
      createReq.selfIndex = v;
S
Shengliang Guan 已提交
237 238 239
    }
  }

S
Shengliang Guan 已提交
240
  if (createReq.selfIndex == -1) {
S
Shengliang Guan 已提交
241 242 243 244
    terrno = TSDB_CODE_MND_APP_ERROR;
    return NULL;
  }

S
Shengliang Guan 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
  if (contLen < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  void *pReq = malloc(contLen);
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
260 261
}

L
Liu Jicong 已提交
262
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
S
Shengliang Guan 已提交
263 264 265 266 267 268 269 270
  SDropVnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;
  dropReq.vgId = pVgroup->vgId;
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  dropReq.dbUid = pDb->uid;

  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
  if (contLen < 0) {
S
Shengliang Guan 已提交
271 272 273 274
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
275 276 277 278 279
  void *pReq = malloc(contLen);
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
280

S
Shengliang Guan 已提交
281 282 283
  tSerializeSDropVnodeReq(pReq, contLen, &dropReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
284 285
}

S
Shengliang Guan 已提交
286 287 288 289 290 291 292 293 294 295 296 297
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  pDnode->numOfVnodes = 0;
  return true;
}

static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  SArray    *pArray = p1;

  int64_t curMs = taosGetTimestampMs();
  bool    online = mndIsDnodeOnline(pMnode, pDnode, curMs);
S
Shengliang Guan 已提交
298 299
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
S
Shengliang Guan 已提交
300 301 302 303 304 305 306 307

  mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
         pDnode->numOfSupportVnodes, isMnode, online);

  if (isMnode) {
    pDnode->numOfVnodes++;
  }

S
Shengliang Guan 已提交
308 309 310
  if (online && pDnode->numOfSupportVnodes > 0) {
    taosArrayPush(pArray, pDnode);
  }
S
Shengliang Guan 已提交
311 312 313
  return true;
}

S
Shengliang Guan 已提交
314
static SArray *mndBuildDnodesArray(SMnode *pMnode) {
S
Shengliang Guan 已提交
315
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
316
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
317

S
Shengliang Guan 已提交
318 319 320 321 322
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
  if (pArray == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
323

S
Shengliang Guan 已提交
324 325
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
S
Shengliang Guan 已提交
326 327 328 329 330 331
  return pArray;
}

static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
  float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
  float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
S
Shengliang Guan 已提交
332
  return d1Score >= d2Score ? 1 : 0;
S
Shengliang Guan 已提交
333 334 335 336 337 338 339 340 341
}

static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
  SSdb   *pSdb = pMnode->pSdb;
  int32_t allocedVnodes = 0;
  void   *pIter = NULL;

  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);

S
Shengliang Guan 已提交
342 343 344 345 346 347 348 349
  int32_t size = taosArrayGetSize(pArray);
  if (size < pVgroup->replica) {
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
           pVgroup->replica);
    terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
    return -1;
  }

S
Shengliang Guan 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
    if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
      return -1;
    }

    pVgid->dnodeId = pDnode->id;
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    } else {
      pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
    }

S
Shengliang Guan 已提交
365
    mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
S
Shengliang Guan 已提交
366
    pDnode->numOfVnodes++;
S
Shengliang Guan 已提交
367
  }
S
Shengliang Guan 已提交
368

S
Shengliang Guan 已提交
369 370
  return 0;
}
371

S
Shengliang Guan 已提交
372
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
S
Shengliang Guan 已提交
373 374 375 376 377
  int32_t code = -1;
  SArray *pArray = NULL;
  SVgObj *pVgroups = NULL;

  pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
S
Shengliang Guan 已提交
378 379
  if (pVgroups == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
380
    goto ALLOC_VGROUP_OVER;
S
Shengliang Guan 已提交
381 382
  }

S
Shengliang Guan 已提交
383 384 385 386 387 388 389 390
  pArray = mndBuildDnodesArray(pMnode);
  if (pArray == NULL) {
    goto ALLOC_VGROUP_OVER;
  }

  mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
         pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);

S
Shengliang Guan 已提交
391
  int32_t  allocedVgroups = 0;
S
Shengliang Guan 已提交
392 393 394
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
  uint32_t hashMin = 0;
  uint32_t hashMax = UINT32_MAX;
395
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
S
Shengliang Guan 已提交
396

397 398
  if (maxVgId < 2) maxVgId = 2;

399
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
S
Shengliang Guan 已提交
400
    SVgObj *pVgroup = &pVgroups[v];
S
Shengliang Guan 已提交
401
    pVgroup->vgId = maxVgId++;
S
Shengliang Guan 已提交
402 403
    pVgroup->createdTime = taosGetTimestampMs();
    pVgroup->updateTime = pVgroups->createdTime;
S
Shengliang Guan 已提交
404
    pVgroup->version = 1;
L
Liu Jicong 已提交
405
    pVgroup->streamMode = pDb->cfg.streamMode;
S
Shengliang Guan 已提交
406
    pVgroup->hashBegin = hashMin + hashInterval * v;
407
    if (v == pDb->cfg.numOfVgroups - 1) {
S
Shengliang Guan 已提交
408 409
      pVgroup->hashEnd = hashMax;
    } else {
S
Shengliang Guan 已提交
410
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
S
Shengliang Guan 已提交
411
    }
S
Shengliang Guan 已提交
412

413
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
414
    pVgroup->dbUid = pDb->uid;
S
Shengliang Guan 已提交
415
    pVgroup->replica = pDb->cfg.replications;
S
Shengliang Guan 已提交
416

S
Shengliang Guan 已提交
417
    if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
S
Shengliang Guan 已提交
418
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
S
Shengliang Guan 已提交
419
      goto ALLOC_VGROUP_OVER;
S
Shengliang Guan 已提交
420 421
    }

S
Shengliang Guan 已提交
422
    allocedVgroups++;
423 424
  }

S
Shengliang Guan 已提交
425
  *ppVgroups = pVgroups;
S
Shengliang Guan 已提交
426 427 428 429 430 431 432 433
  code = 0;

  mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);

ALLOC_VGROUP_OVER:
  if (code != 0) free(pVgroups);
  taosArrayDestroy(pArray);
  return code;
434 435
}

L
Liu Jicong 已提交
436
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
437 438 439
  SEpSet epset = {0};

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
L
Liu Jicong 已提交
440 441
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
442 443 444 445 446 447
    if (pDnode == NULL) continue;

    if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
      epset.inUse = epset.numOfEps;
    }

H
Haojun Liao 已提交
448
    addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
449 450 451 452 453 454
    mndReleaseDnode(pMnode, pDnode);
  }

  return epset;
}

S
Shengliang Guan 已提交
455
static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
456
  mndTransProcessRsp(pRsp);
457 458 459
  return 0;
}

S
Shengliang Guan 已提交
460
static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
461
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
462 463 464
  return 0;
}

S
Shengliang Guan 已提交
465
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
466
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
467 468 469
  return 0;
}

S
Shengliang Guan 已提交
470
static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp) { return 0; }
S
Shengliang Guan 已提交
471

S
Shengliang Guan 已提交
472
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp) { return 0; }
S
Shengliang Guan 已提交
473

S
Shengliang Guan 已提交
474 475 476 477 478 479 480
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int64_t  uid = *(int64_t *)p1;
  int8_t  *pReplica = p2;
  int32_t *pNumOfVgroups = p3;

  if (pVgroup->dbUid == uid) {
dengyihao's avatar
dengyihao 已提交
481
    *pReplica = TMAX(*pReplica, pVgroup->replica);
S
Shengliang Guan 已提交
482 483 484 485 486 487
    (*pNumOfVgroups)++;
  }

  return true;
}

S
Shengliang Guan 已提交
488
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
S
Shengliang Guan 已提交
489
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
490 491 492 493 494 495
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

S
Shengliang Guan 已提交
496 497 498 499
  *pReplica = 1;
  *pNumOfVgroups = 0;
  sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups);
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
500
  return 0;
S
Shengliang Guan 已提交
501 502
}

S
Shengliang Guan 已提交
503 504
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
505 506
  SSdb   *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
507 508 509
  if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) {
    return -1;
  }
S
Shengliang Guan 已提交
510 511

  int32_t  cols = 0;
S
Shengliang Guan 已提交
512
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
513 514 515 516

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
S
Shengliang Guan 已提交
517
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
518 519 520 521 522
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tables");
S
Shengliang Guan 已提交
523
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
524 525 526
  cols++;

  for (int32_t i = 0; i < pShow->replica; ++i) {
S
Shengliang Guan 已提交
527
    pShow->bytes[cols] = 2;
S
Shengliang Guan 已提交
528 529
    pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
    snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
S
Shengliang Guan 已提交
530
    pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
531 532 533 534 535
    cols++;

    pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
S
Shengliang Guan 已提交
536
    pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
537 538 539
    cols++;
  }

S
Shengliang Guan 已提交
540
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
541 542 543 544 545 546 547 548
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
549
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
550 551 552 553

  return 0;
}

S
Shengliang Guan 已提交
554 555
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
556 557 558 559 560 561
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  int32_t cols = 0;
  char   *pWrite;

H
Haojun Liao 已提交
562 563 564 565 566 567 568
  SDbObj *pDb = NULL;
  if (strlen(pShow->db) > 0) {
    pDb = mndAcquireDb(pMnode, pShow->db);
    if (pDb == NULL) {
      return 0;
    }
  }
S
Shengliang Guan 已提交
569

S
Shengliang Guan 已提交
570 571 572 573
  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

H
Haojun Liao 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
      continue;
    }

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->vgId;
    cols++;

    SName name = {0};
    char  db[TSDB_DB_NAME_LEN] = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
    tNameGetDbName(&name, db);

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, db);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->numOfTables;
    cols++;

    //status
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, "ready"); // TODO
    cols++;

    //onlines
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->replica;
    cols++;
S
Shengliang Guan 已提交
606

H
Haojun Liao 已提交
607 608

    for (int32_t i = 0; i < pVgroup->replica; ++i) {
S
Shengliang Guan 已提交
609
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
H
Haojun Liao 已提交
610
      *(int16_t *)pWrite = pVgroup->vnodeGid[i].dnodeId;
S
Shengliang Guan 已提交
611 612
      cols++;

H
Haojun Liao 已提交
613
      const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role);
S
Shengliang Guan 已提交
614
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
H
Haojun Liao 已提交
615
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
S
Shengliang Guan 已提交
616 617
      cols++;
    }
S
Shengliang Guan 已提交
618

H
Haojun Liao 已提交
619
    numOfRows++;
S
Shengliang Guan 已提交
620 621 622
    sdbRelease(pSdb, pVgroup);
  }

H
Haojun Liao 已提交
623 624 625 626
  if (pDb != NULL) {
    mndReleaseDb(pMnode, pDb);
  }

S
Shengliang Guan 已提交
627
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
628 629 630 631 632 633 634 635 636
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

S
Shengliang Guan 已提交
637 638 639 640
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int32_t  dnodeId = *(int32_t *)p1;
  int32_t *pNumOfVnodes = (int32_t *)p2;
S
Shengliang Guan 已提交
641

S
Shengliang Guan 已提交
642 643 644
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
      (*pNumOfVnodes)++;
S
Shengliang Guan 已提交
645
    }
S
Shengliang Guan 已提交
646 647
  }

S
Shengliang Guan 已提交
648 649 650 651 652 653
  return true;
}

int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
  int32_t numOfVnodes = 0;
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
S
Shengliang Guan 已提交
654
  return numOfVnodes;
S
Shengliang Guan 已提交
655 656
}

S
Shengliang Guan 已提交
657 658
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
659 660 661
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
662
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
663 664 665 666

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
S
Shengliang Guan 已提交
667
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
668 669 670 671 672
  cols++;

  pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "status");
S
Shengliang Guan 已提交
673
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
674 675
  cols++;

S
Shengliang Guan 已提交
676
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  int32_t dnodeId = 0;
  if (pShow->payloadLen > 0) {
    dnodeId = atoi(pShow->payload);
  }

  pShow->replica = dnodeId;
  pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
692
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
693 694 695 696

  return 0;
}

S
Shengliang Guan 已提交
697 698
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
699 700 701 702 703
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  char   *pWrite;
  int32_t cols = 0;
H
Haojun Liao 已提交
704
//  int32_t dnodeId = pShow->replica;
S
Shengliang Guan 已提交
705 706 707 708 709

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
710
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
S
Shengliang Guan 已提交
711 712 713 714 715 716 717
      SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = pVgroup->vgId;
      cols++;

H
Haojun Liao 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730
      SName name = {0};
      char  db[TSDB_DB_NAME_LEN] = {0};
      tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
      tNameGetDbName(&name, db);

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_TO_VARSTR(pWrite, db);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = 0;  //todo: Tables
      cols++;

S
Shengliang Guan 已提交
731
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
732
      STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role));
S
Shengliang Guan 已提交
733
      cols++;
H
Haojun Liao 已提交
734 735 736 737 738

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = pVgroup->replica;  //onlines
      cols++;

S
Shengliang Guan 已提交
739 740 741 742 743 744
      numOfRows++;
    }

    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
745
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
746 747 748 749 750 751 752
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
L
Liu Jicong 已提交
753
}