mndVgroup.c 23.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndVgroup.h"
S
Shengliang Guan 已提交
18
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
S
Shengliang Guan 已提交
21 22
#include "mndShow.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
23

L
Liu Jicong 已提交
24
#define TSDB_VGROUP_VER_NUMBER   1
S
Shengliang Guan 已提交
25 26
#define TSDB_VGROUP_RESERVE_SIZE 64

S
Shengliang Guan 已提交
27 28 29
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t  mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t  mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
S
Shengliang Guan 已提交
30
static int32_t  mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39
static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);

static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
40
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
41 42
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
S
Shengliang Guan 已提交
43 44 45
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);

int32_t mndInitVgroup(SMnode *pMnode) {
S
Shengliang Guan 已提交
46
  SSdbTable table = {.sdbType = SDB_VGROUP,
47
                     .keyType = SDB_KEY_INT32,
S
Shengliang Guan 已提交
48 49 50 51 52 53
                     .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
                     .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
                     .insertFp = (SdbInsertFp)mndVgroupActionInsert,
                     .updateFp = (SdbUpdateFp)mndVgroupActionDelete,
                     .deleteFp = (SdbDeleteFp)mndVgroupActionUpdate};

H
Hongze Cheng 已提交
54 55 56 57 58
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_SYNC_VNODE_RSP, mndProcessSyncVnodeRsp);
  mndSetMsgHandle(pMnode, TDMT_DND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
S
Shengliang Guan 已提交
59 60 61 62

  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndGetVgroupMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
S
Shengliang Guan 已提交
63 64 65 66
  mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta);
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);

S
Shengliang Guan 已提交
67
  return sdbSetTable(pMnode->pSdb, table);
S
Shengliang Guan 已提交
68 69 70 71
}

void mndCleanupVgroup(SMnode *pMnode) {}

S
Shengliang Guan 已提交
72
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
73 74
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
75
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
76
  if (pRaw == NULL) goto VG_ENCODE_OVER;
S
Shengliang Guan 已提交
77 78

  int32_t dataPos = 0;
79 80 81 82 83 84 85 86 87
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER)
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER)
S
Shengliang Guan 已提交
88 89
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
90 91 92 93 94 95 96 97 98 99 100 101
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER)
  }
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER)

  terrno = 0;

VG_ENCODE_OVER:
  if (terrno != 0) {
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
102 103
  }

104
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
105 106 107
  return pRaw;
}

S
Shengliang Guan 已提交
108
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
109 110
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
111
  int8_t sver = 0;
112
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  if (sver != TSDB_VGROUP_VER_NUMBER) {
S
Shengliang Guan 已提交
115
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
116
    goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
117 118
  }

S
Shengliang Guan 已提交
119
  SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
120 121 122 123
  if (pRow == NULL) goto VG_DECODE_OVER;

  SVgObj *pVgroup = sdbGetRowObj(pRow);
  if (pVgroup == NULL) goto VG_DECODE_OVER;
S
Shengliang Guan 已提交
124 125

  int32_t dataPos = 0;
126 127 128 129 130 131 132 133 134
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER)
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER)
S
Shengliang Guan 已提交
135 136
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
137
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER)
S
Shengliang Guan 已提交
138 139 140
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    }
S
Shengliang Guan 已提交
141
  }
142 143 144 145 146 147 148 149 150 151
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER)

  terrno = 0;

VG_DECODE_OVER:
  if (terrno != 0) {
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
    tfree(pRow);
    return NULL;
  }
S
Shengliang Guan 已提交
152

153
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
154 155 156 157
  return pRow;
}

static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
158
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
159 160 161 162
  return 0;
}

static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
163
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
164 165 166
  return 0;
}

S
Shengliang Guan 已提交
167 168 169 170 171 172 173 174
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
  pOld->updateTime = pNew->updateTime;
  pOld->version = pNew->version;
  pOld->hashBegin = pNew->hashBegin;
  pOld->hashEnd = pNew->hashEnd;
  pOld->replica = pNew->replica;
  memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
S
Shengliang Guan 已提交
175 176 177 178
  return 0;
}

SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
S
Shengliang Guan 已提交
179 180
  SSdb   *pSdb = pMnode->pSdb;
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
S
Shengliang Guan 已提交
181
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
182 183 184
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
  }
  return pVgroup;
S
Shengliang Guan 已提交
185 186 187 188 189 190 191
}

void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pVgroup);
}

S
Shengliang Guan 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
  SCreateVnodeReq createReq = {0};
  createReq.vgId = pVgroup->vgId;
  createReq.dnodeId = pDnode->id;
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  createReq.dbUid = pDb->uid;
  createReq.vgVersion = pVgroup->version;
  createReq.cacheBlockSize = pDb->cfg.cacheBlockSize;
  createReq.totalBlocks = pDb->cfg.totalBlocks;
  createReq.daysPerFile = pDb->cfg.daysPerFile;
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
  createReq.minRows = pDb->cfg.minRows;
  createReq.maxRows = pDb->cfg.maxRows;
  createReq.commitTime = pDb->cfg.commitTime;
  createReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
  createReq.walLevel = pDb->cfg.walLevel;
  createReq.precision = pDb->cfg.precision;
  createReq.compression = pDb->cfg.compression;
  createReq.quorum = pDb->cfg.quorum;
  createReq.update = pDb->cfg.update;
  createReq.cacheLastRow = pDb->cfg.cacheLastRow;
  createReq.replica = pVgroup->replica;
  createReq.selfIndex = -1;
L
Liu Jicong 已提交
217
  createReq.streamMode = pVgroup->streamMode;
D
dapan1121 已提交
218 219 220
  createReq.hashBegin = pVgroup->hashBegin;
  createReq.hashEnd = pVgroup->hashEnd;
  createReq.hashMethod = pDb->hashMethod;
S
sma  
Shengliang Guan 已提交
221 222
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
  createReq.pRetensions = pDb->cfg.pRetensions;
S
Shengliang Guan 已提交
223 224

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
S
Shengliang Guan 已提交
225
    SReplica  *pReplica = &createReq.replicas[v];
S
Shengliang Guan 已提交
226 227 228 229 230 231
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
    if (pVgidDnode == NULL) {
      return NULL;
    }

S
Shengliang Guan 已提交
232 233
    pReplica->id = pVgidDnode->id;
    pReplica->port = pVgidDnode->port;
S
Shengliang Guan 已提交
234 235 236 237
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
    mndReleaseDnode(pMnode, pVgidDnode);

    if (pDnode->id == pVgid->dnodeId) {
S
Shengliang Guan 已提交
238
      createReq.selfIndex = v;
S
Shengliang Guan 已提交
239 240 241
    }
  }

S
Shengliang Guan 已提交
242
  if (createReq.selfIndex == -1) {
S
Shengliang Guan 已提交
243 244 245 246
    terrno = TSDB_CODE_MND_APP_ERROR;
    return NULL;
  }

S
Shengliang Guan 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
  if (contLen < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  void *pReq = malloc(contLen);
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
262 263
}

L
Liu Jicong 已提交
264
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
S
Shengliang Guan 已提交
265 266 267 268 269 270 271 272
  SDropVnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;
  dropReq.vgId = pVgroup->vgId;
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  dropReq.dbUid = pDb->uid;

  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
  if (contLen < 0) {
S
Shengliang Guan 已提交
273 274 275 276
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
277 278 279 280 281
  void *pReq = malloc(contLen);
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
282

S
Shengliang Guan 已提交
283 284 285
  tSerializeSDropVnodeReq(pReq, contLen, &dropReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
286 287
}

S
Shengliang Guan 已提交
288 289 290 291 292 293 294 295 296 297 298 299
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  pDnode->numOfVnodes = 0;
  return true;
}

static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  SArray    *pArray = p1;

  int64_t curMs = taosGetTimestampMs();
  bool    online = mndIsDnodeOnline(pMnode, pDnode, curMs);
S
Shengliang Guan 已提交
300 301
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
S
Shengliang Guan 已提交
302 303 304 305 306 307 308 309

  mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
         pDnode->numOfSupportVnodes, isMnode, online);

  if (isMnode) {
    pDnode->numOfVnodes++;
  }

S
Shengliang Guan 已提交
310 311 312
  if (online && pDnode->numOfSupportVnodes > 0) {
    taosArrayPush(pArray, pDnode);
  }
S
Shengliang Guan 已提交
313 314 315
  return true;
}

S
Shengliang Guan 已提交
316
static SArray *mndBuildDnodesArray(SMnode *pMnode) {
S
Shengliang Guan 已提交
317
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
318
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
319

S
Shengliang Guan 已提交
320 321 322 323 324
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
  if (pArray == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
325

S
Shengliang Guan 已提交
326 327
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
S
Shengliang Guan 已提交
328 329 330 331 332 333
  return pArray;
}

static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
  float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
  float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
S
Shengliang Guan 已提交
334
  return d1Score >= d2Score ? 1 : 0;
S
Shengliang Guan 已提交
335 336 337 338 339 340 341 342 343
}

static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
  SSdb   *pSdb = pMnode->pSdb;
  int32_t allocedVnodes = 0;
  void   *pIter = NULL;

  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);

S
Shengliang Guan 已提交
344 345 346 347 348 349 350 351
  int32_t size = taosArrayGetSize(pArray);
  if (size < pVgroup->replica) {
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
           pVgroup->replica);
    terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
    return -1;
  }

S
Shengliang Guan 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
    if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
      return -1;
    }

    pVgid->dnodeId = pDnode->id;
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    } else {
      pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
    }

S
Shengliang Guan 已提交
367
    mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
S
Shengliang Guan 已提交
368
    pDnode->numOfVnodes++;
S
Shengliang Guan 已提交
369
  }
S
Shengliang Guan 已提交
370

S
Shengliang Guan 已提交
371 372
  return 0;
}
373

S
Shengliang Guan 已提交
374
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
S
Shengliang Guan 已提交
375 376 377 378 379
  int32_t code = -1;
  SArray *pArray = NULL;
  SVgObj *pVgroups = NULL;

  pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
S
Shengliang Guan 已提交
380 381
  if (pVgroups == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
382
    goto ALLOC_VGROUP_OVER;
S
Shengliang Guan 已提交
383 384
  }

S
Shengliang Guan 已提交
385 386 387 388 389 390 391 392
  pArray = mndBuildDnodesArray(pMnode);
  if (pArray == NULL) {
    goto ALLOC_VGROUP_OVER;
  }

  mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
         pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);

S
Shengliang Guan 已提交
393
  int32_t  allocedVgroups = 0;
S
Shengliang Guan 已提交
394 395 396
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
  uint32_t hashMin = 0;
  uint32_t hashMax = UINT32_MAX;
397
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
S
Shengliang Guan 已提交
398

399 400
  if (maxVgId < 2) maxVgId = 2;

401
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
S
Shengliang Guan 已提交
402
    SVgObj *pVgroup = &pVgroups[v];
S
Shengliang Guan 已提交
403
    pVgroup->vgId = maxVgId++;
S
Shengliang Guan 已提交
404 405
    pVgroup->createdTime = taosGetTimestampMs();
    pVgroup->updateTime = pVgroups->createdTime;
S
Shengliang Guan 已提交
406
    pVgroup->version = 1;
L
Liu Jicong 已提交
407
    pVgroup->streamMode = pDb->cfg.streamMode;
S
Shengliang Guan 已提交
408
    pVgroup->hashBegin = hashMin + hashInterval * v;
409
    if (v == pDb->cfg.numOfVgroups - 1) {
S
Shengliang Guan 已提交
410 411
      pVgroup->hashEnd = hashMax;
    } else {
S
Shengliang Guan 已提交
412
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
S
Shengliang Guan 已提交
413
    }
S
Shengliang Guan 已提交
414

415
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
416
    pVgroup->dbUid = pDb->uid;
S
Shengliang Guan 已提交
417
    pVgroup->replica = pDb->cfg.replications;
S
Shengliang Guan 已提交
418

S
Shengliang Guan 已提交
419
    if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
S
Shengliang Guan 已提交
420
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
S
Shengliang Guan 已提交
421
      goto ALLOC_VGROUP_OVER;
S
Shengliang Guan 已提交
422 423
    }

S
Shengliang Guan 已提交
424
    allocedVgroups++;
425 426
  }

S
Shengliang Guan 已提交
427
  *ppVgroups = pVgroups;
S
Shengliang Guan 已提交
428 429 430 431 432 433 434 435
  code = 0;

  mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);

ALLOC_VGROUP_OVER:
  if (code != 0) free(pVgroups);
  taosArrayDestroy(pArray);
  return code;
436 437
}

L
Liu Jicong 已提交
438
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
439 440 441
  SEpSet epset = {0};

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
L
Liu Jicong 已提交
442 443
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
444 445 446 447 448 449
    if (pDnode == NULL) continue;

    if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
      epset.inUse = epset.numOfEps;
    }

H
Haojun Liao 已提交
450
    addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
451 452 453 454 455 456
    mndReleaseDnode(pMnode, pDnode);
  }

  return epset;
}

S
Shengliang Guan 已提交
457
static int32_t mndProcessCreateVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
458
  mndTransProcessRsp(pRsp);
459 460 461
  return 0;
}

S
Shengliang Guan 已提交
462
static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
463
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
464 465 466
  return 0;
}

S
Shengliang Guan 已提交
467
static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp) {
S
Shengliang Guan 已提交
468
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
469 470 471
  return 0;
}

S
Shengliang Guan 已提交
472
static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp) { return 0; }
S
Shengliang Guan 已提交
473

S
Shengliang Guan 已提交
474
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp) { return 0; }
S
Shengliang Guan 已提交
475

S
Shengliang Guan 已提交
476 477 478 479 480 481 482
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int64_t  uid = *(int64_t *)p1;
  int8_t  *pReplica = p2;
  int32_t *pNumOfVgroups = p3;

  if (pVgroup->dbUid == uid) {
dengyihao's avatar
dengyihao 已提交
483
    *pReplica = TMAX(*pReplica, pVgroup->replica);
S
Shengliang Guan 已提交
484 485 486 487 488 489
    (*pNumOfVgroups)++;
  }

  return true;
}

S
Shengliang Guan 已提交
490
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
S
Shengliang Guan 已提交
491
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
492 493 494 495 496 497
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

S
Shengliang Guan 已提交
498 499 500 501
  *pReplica = 1;
  *pNumOfVgroups = 0;
  sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups);
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
502
  return 0;
S
Shengliang Guan 已提交
503 504
}

S
Shengliang Guan 已提交
505 506
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
507 508
  SSdb   *pSdb = pMnode->pSdb;

S
Shengliang Guan 已提交
509 510 511
  if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) {
    return -1;
  }
S
Shengliang Guan 已提交
512 513

  int32_t  cols = 0;
S
Shengliang Guan 已提交
514
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
515 516 517 518

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
S
Shengliang Guan 已提交
519
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
520 521 522 523 524
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tables");
S
Shengliang Guan 已提交
525
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
526 527 528
  cols++;

  for (int32_t i = 0; i < pShow->replica; ++i) {
S
Shengliang Guan 已提交
529
    pShow->bytes[cols] = 2;
S
Shengliang Guan 已提交
530 531
    pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
    snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
S
Shengliang Guan 已提交
532
    pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
533 534 535 536 537
    cols++;

    pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
    pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
    snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
S
Shengliang Guan 已提交
538
    pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
539 540 541
    cols++;
  }

S
Shengliang Guan 已提交
542
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
543 544 545 546 547 548 549 550
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
551
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
552 553 554 555

  return 0;
}

S
Shengliang Guan 已提交
556 557
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
558 559 560 561 562 563
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  int32_t cols = 0;
  char   *pWrite;

H
Haojun Liao 已提交
564 565 566 567 568 569 570
  SDbObj *pDb = NULL;
  if (strlen(pShow->db) > 0) {
    pDb = mndAcquireDb(pMnode, pShow->db);
    if (pDb == NULL) {
      return 0;
    }
  }
S
Shengliang Guan 已提交
571

S
Shengliang Guan 已提交
572 573 574 575
  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

H
Haojun Liao 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
      continue;
    }

    cols = 0;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->vgId;
    cols++;

    SName name = {0};
    char  db[TSDB_DB_NAME_LEN] = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
    tNameGetDbName(&name, db);

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, db);
    cols++;

    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->numOfTables;
    cols++;

    //status
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    STR_TO_VARSTR(pWrite, "ready"); // TODO
    cols++;

    //onlines
    pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
    *(int32_t *)pWrite = pVgroup->replica;
    cols++;
S
Shengliang Guan 已提交
608

H
Haojun Liao 已提交
609 610

    for (int32_t i = 0; i < pVgroup->replica; ++i) {
S
Shengliang Guan 已提交
611
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
H
Haojun Liao 已提交
612
      *(int16_t *)pWrite = pVgroup->vnodeGid[i].dnodeId;
S
Shengliang Guan 已提交
613 614
      cols++;

H
Haojun Liao 已提交
615
      const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role);
S
Shengliang Guan 已提交
616
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
H
Haojun Liao 已提交
617
      STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
S
Shengliang Guan 已提交
618 619
      cols++;
    }
S
Shengliang Guan 已提交
620

H
Haojun Liao 已提交
621
    numOfRows++;
S
Shengliang Guan 已提交
622 623 624
    sdbRelease(pSdb, pVgroup);
  }

H
Haojun Liao 已提交
625 626 627 628
  if (pDb != NULL) {
    mndReleaseDb(pMnode, pDb);
  }

S
Shengliang Guan 已提交
629
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
630 631 632 633 634 635 636 637 638
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

S
Shengliang Guan 已提交
639 640 641 642
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int32_t  dnodeId = *(int32_t *)p1;
  int32_t *pNumOfVnodes = (int32_t *)p2;
S
Shengliang Guan 已提交
643

S
Shengliang Guan 已提交
644 645 646
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
      (*pNumOfVnodes)++;
S
Shengliang Guan 已提交
647
    }
S
Shengliang Guan 已提交
648 649
  }

S
Shengliang Guan 已提交
650 651 652 653 654 655
  return true;
}

int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
  int32_t numOfVnodes = 0;
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
S
Shengliang Guan 已提交
656
  return numOfVnodes;
S
Shengliang Guan 已提交
657 658
}

S
Shengliang Guan 已提交
659 660
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
661 662 663
  SSdb   *pSdb = pMnode->pSdb;

  int32_t  cols = 0;
S
Shengliang Guan 已提交
664
  SSchema *pSchema = pMeta->pSchemas;
S
Shengliang Guan 已提交
665 666 667 668

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "vgId");
S
Shengliang Guan 已提交
669
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
670 671 672 673 674
  cols++;

  pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "status");
S
Shengliang Guan 已提交
675
  pSchema[cols].bytes = pShow->bytes[cols];
S
Shengliang Guan 已提交
676 677
  cols++;

S
Shengliang Guan 已提交
678
  pMeta->numOfColumns = cols;
S
Shengliang Guan 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  int32_t dnodeId = 0;
  if (pShow->payloadLen > 0) {
    dnodeId = atoi(pShow->payload);
  }

  pShow->replica = dnodeId;
  pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
D
dapan1121 已提交
694
  strcpy(pMeta->tbName, mndShowStr(pShow->type));
S
Shengliang Guan 已提交
695 696 697 698

  return 0;
}

S
Shengliang Guan 已提交
699 700
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
  SMnode *pMnode = pReq->pNode;
S
Shengliang Guan 已提交
701 702 703 704 705
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  char   *pWrite;
  int32_t cols = 0;
H
Haojun Liao 已提交
706
//  int32_t dnodeId = pShow->replica;
S
Shengliang Guan 已提交
707 708 709 710 711

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
712
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
S
Shengliang Guan 已提交
713 714 715 716 717 718 719
      SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
      cols = 0;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = pVgroup->vgId;
      cols++;

H
Haojun Liao 已提交
720 721 722 723 724 725 726 727 728 729 730 731 732
      SName name = {0};
      char  db[TSDB_DB_NAME_LEN] = {0};
      tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
      tNameGetDbName(&name, db);

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      STR_TO_VARSTR(pWrite, db);
      cols++;

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = 0;  //todo: Tables
      cols++;

S
Shengliang Guan 已提交
733
      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
S
Shengliang Guan 已提交
734
      STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role));
S
Shengliang Guan 已提交
735
      cols++;
H
Haojun Liao 已提交
736 737 738 739 740

      pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
      *(uint32_t *)pWrite = pVgroup->replica;  //onlines
      cols++;

S
Shengliang Guan 已提交
741 742 743 744 745 746
      numOfRows++;
    }

    sdbRelease(pSdb, pVgroup);
  }

S
Shengliang Guan 已提交
747
  mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
S
Shengliang Guan 已提交
748 749 750 751 752 753 754
  pShow->numOfReads += numOfRows;
  return numOfRows;
}

static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
L
Liu Jicong 已提交
755
}