mndVgroup.c 25.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndVgroup.h"
S
Shengliang Guan 已提交
18
#include "mndDb.h"
S
Shengliang Guan 已提交
19
#include "mndDnode.h"
S
Shengliang Guan 已提交
20
#include "mndMnode.h"
S
Shengliang Guan 已提交
21 22
#include "mndShow.h"
#include "mndTrans.h"
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25
#define VGROUP_VER_NUMBER   1
#define VGROUP_RESERVE_SIZE 64
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27 28 29
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t  mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t  mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
S
Shengliang Guan 已提交
30
static int32_t  mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32 33 34 35
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp);
S
Shengliang Guan 已提交
36

S
Shengliang Guan 已提交
37
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
38
static void    mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
S
Shengliang Guan 已提交
39
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
S
Shengliang Guan 已提交
40 41 42
static void    mndCancelGetNextVnode(SMnode *pMnode, void *pIter);

int32_t mndInitVgroup(SMnode *pMnode) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51
  SSdbTable table = {
      .sdbType = SDB_VGROUP,
      .keyType = SDB_KEY_INT32,
      .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
      .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
      .insertFp = (SdbInsertFp)mndVgroupActionInsert,
      .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
      .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
  };
S
Shengliang Guan 已提交
52

H
Hongze Cheng 已提交
53
  mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
S
Shengliang Guan 已提交
54
  mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
H
Hongze Cheng 已提交
55
  mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
S
Shengliang Guan 已提交
56
  mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
S
Shengliang Guan 已提交
57 58 59

  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
S
Shengliang Guan 已提交
60 61 62
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);

S
Shengliang Guan 已提交
63
  return sdbSetTable(pMnode->pSdb, table);
S
Shengliang Guan 已提交
64 65 66 67
}

void mndCleanupVgroup(SMnode *pMnode) {}

S
Shengliang Guan 已提交
68
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
69 70
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
71
  SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, VGROUP_VER_NUMBER, sizeof(SVgObj) + VGROUP_RESERVE_SIZE);
S
Shengliang Guan 已提交
72
  if (pRaw == NULL) goto _OVER;
S
Shengliang Guan 已提交
73 74

  int32_t dataPos = 0;
S
Shengliang Guan 已提交
75 76 77 78 79 80 81 82 83
  SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, _OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->version, _OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, _OVER)
  SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
  SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
  SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
  SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
S
Shengliang Guan 已提交
84 85
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
86
    SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
87
  }
S
Shengliang Guan 已提交
88
  SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
S
Shengliang Guan 已提交
89
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
90 91 92

  terrno = 0;

S
Shengliang Guan 已提交
93
_OVER:
94 95 96 97
  if (terrno != 0) {
    mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
98 99
  }

100
  mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
101 102 103
  return pRaw;
}

S
Shengliang Guan 已提交
104
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
105 106
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
107
  int8_t sver = 0;
S
Shengliang Guan 已提交
108
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
S
Shengliang Guan 已提交
109

S
Shengliang Guan 已提交
110
  if (sver != VGROUP_VER_NUMBER) {
S
Shengliang Guan 已提交
111
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
S
Shengliang Guan 已提交
112
    goto _OVER;
S
Shengliang Guan 已提交
113 114
  }

S
Shengliang Guan 已提交
115
  SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
S
Shengliang Guan 已提交
116
  if (pRow == NULL) goto _OVER;
117 118

  SVgObj *pVgroup = sdbGetRowObj(pRow);
S
Shengliang Guan 已提交
119
  if (pVgroup == NULL) goto _OVER;
S
Shengliang Guan 已提交
120 121

  int32_t dataPos = 0;
S
Shengliang Guan 已提交
122 123 124 125 126 127 128 129 130
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, _OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, _OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, _OVER)
  SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
  SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
  SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
  SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
S
Shengliang Guan 已提交
131 132
  for (int8_t i = 0; i < pVgroup->replica; ++i) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
S
Shengliang Guan 已提交
133
    SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, _OVER)
S
Shengliang Guan 已提交
134 135 136
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    }
S
Shengliang Guan 已提交
137
  }
S
Shengliang Guan 已提交
138
  SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
139 140 141

  terrno = 0;

S
Shengliang Guan 已提交
142
_OVER:
143 144
  if (terrno != 0) {
    mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
wafwerar's avatar
wafwerar 已提交
145
    taosMemoryFreeClear(pRow);
146 147
    return NULL;
  }
S
Shengliang Guan 已提交
148

149
  mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
S
Shengliang Guan 已提交
150 151 152 153
  return pRow;
}

static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
154
  mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
155 156 157 158
  return 0;
}

static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
159
  mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
S
Shengliang Guan 已提交
160 161 162
  return 0;
}

S
Shengliang Guan 已提交
163 164 165 166 167 168 169 170
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
  mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
  pOld->updateTime = pNew->updateTime;
  pOld->version = pNew->version;
  pOld->hashBegin = pNew->hashBegin;
  pOld->hashEnd = pNew->hashEnd;
  pOld->replica = pNew->replica;
  memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
S
Shengliang Guan 已提交
171 172 173 174
  return 0;
}

SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId) {
S
Shengliang Guan 已提交
175 176
  SSdb   *pSdb = pMnode->pSdb;
  SVgObj *pVgroup = sdbAcquire(pSdb, SDB_VGROUP, &vgId);
S
Shengliang Guan 已提交
177
  if (pVgroup == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
S
Shengliang Guan 已提交
178 179 180
    terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
  }
  return pVgroup;
S
Shengliang Guan 已提交
181 182 183 184 185 186 187
}

void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pVgroup);
}

S
Shengliang Guan 已提交
188 189 190 191 192 193 194
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
  SCreateVnodeReq createReq = {0};
  createReq.vgId = pVgroup->vgId;
  createReq.dnodeId = pDnode->id;
  memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  createReq.dbUid = pDb->uid;
  createReq.vgVersion = pVgroup->version;
S
Shengliang Guan 已提交
195 196 197 198
  createReq.numOfStables = pDb->cfg.numOfStables;
  createReq.buffer = pDb->cfg.buffer;
  createReq.pageSize = pDb->cfg.pageSize;
  createReq.pages = pDb->cfg.pages;
S
Shengliang Guan 已提交
199 200 201 202
  createReq.daysPerFile = pDb->cfg.daysPerFile;
  createReq.daysToKeep0 = pDb->cfg.daysToKeep0;
  createReq.daysToKeep1 = pDb->cfg.daysToKeep1;
  createReq.daysToKeep2 = pDb->cfg.daysToKeep2;
S
Shengliang Guan 已提交
203 204 205 206 207 208
  createReq.minRows = pDb->cfg.minRows;
  createReq.maxRows = pDb->cfg.maxRows;
  createReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
  createReq.walLevel = pDb->cfg.walLevel;
  createReq.precision = pDb->cfg.precision;
  createReq.compression = pDb->cfg.compression;
S
Shengliang Guan 已提交
209
  createReq.strict = pDb->cfg.strict;
S
Shengliang Guan 已提交
210 211 212
  createReq.cacheLastRow = pDb->cfg.cacheLastRow;
  createReq.replica = pVgroup->replica;
  createReq.selfIndex = -1;
D
dapan1121 已提交
213 214
  createReq.hashBegin = pVgroup->hashBegin;
  createReq.hashEnd = pVgroup->hashEnd;
S
Shengliang Guan 已提交
215
  createReq.hashMethod = pDb->cfg.hashMethod;
S
sma  
Shengliang Guan 已提交
216 217
  createReq.numOfRetensions = pDb->cfg.numOfRetensions;
  createReq.pRetensions = pDb->cfg.pRetensions;
S
Shengliang Guan 已提交
218 219

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
S
Shengliang Guan 已提交
220
    SReplica  *pReplica = &createReq.replicas[v];
S
Shengliang Guan 已提交
221 222 223 224 225 226
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
    if (pVgidDnode == NULL) {
      return NULL;
    }

S
Shengliang Guan 已提交
227 228
    pReplica->id = pVgidDnode->id;
    pReplica->port = pVgidDnode->port;
S
Shengliang Guan 已提交
229 230 231 232
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
    mndReleaseDnode(pMnode, pVgidDnode);

    if (pDnode->id == pVgid->dnodeId) {
S
Shengliang Guan 已提交
233
      createReq.selfIndex = v;
S
Shengliang Guan 已提交
234 235 236
    }
  }

S
Shengliang Guan 已提交
237
  if (createReq.selfIndex == -1) {
S
Shengliang Guan 已提交
238 239 240 241
    terrno = TSDB_CODE_MND_APP_ERROR;
    return NULL;
  }

S
Shengliang Guan 已提交
242 243 244 245 246 247
  int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq);
  if (contLen < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
248
  void *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
249 250 251 252 253 254 255 256
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  tSerializeSCreateVnodeReq(pReq, contLen, &createReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
257 258
}

259
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
S
Shengliang Guan 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  SAlterVnodeReq alterReq = {0};
  alterReq.vgVersion = pVgroup->version;
  alterReq.buffer = pDb->cfg.buffer;
  alterReq.pages = pDb->cfg.pages;
  alterReq.pageSize = pDb->cfg.pageSize;
  alterReq.daysPerFile = pDb->cfg.daysPerFile;
  alterReq.daysToKeep0 = pDb->cfg.daysToKeep0;
  alterReq.daysToKeep1 = pDb->cfg.daysToKeep1;
  alterReq.daysToKeep2 = pDb->cfg.daysToKeep2;
  alterReq.fsyncPeriod = pDb->cfg.fsyncPeriod;
  alterReq.walLevel = pDb->cfg.walLevel;
  alterReq.strict = pDb->cfg.strict;
  alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
  alterReq.replica = pVgroup->replica;
  alterReq.selfIndex = -1;

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    SReplica  *pReplica = &alterReq.replicas[v];
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
    if (pVgidDnode == NULL) {
      return NULL;
    }

    pReplica->id = pVgidDnode->id;
    pReplica->port = pVgidDnode->port;
    memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
    mndReleaseDnode(pMnode, pVgidDnode);
  }

290
#if 0
S
Shengliang Guan 已提交
291 292 293 294
  if (alterReq.selfIndex == -1) {
    terrno = TSDB_CODE_MND_APP_ERROR;
    return NULL;
  }
295
#endif
S
Shengliang Guan 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

  int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
  if (contLen < 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
  contLen += +sizeof(SMsgHead);

  void *pReq = taosMemoryMalloc(contLen);
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMsgHead *pHead = pReq;
  pHead->contLen = htonl(contLen);
  pHead->vgId = htonl(pVgroup->vgId);

  tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
  *pContLen = contLen;
  return pReq;
}

L
Liu Jicong 已提交
319
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
S
Shengliang Guan 已提交
320 321 322 323 324 325 326 327
  SDropVnodeReq dropReq = {0};
  dropReq.dnodeId = pDnode->id;
  dropReq.vgId = pVgroup->vgId;
  memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
  dropReq.dbUid = pDb->uid;

  int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
  if (contLen < 0) {
S
Shengliang Guan 已提交
328 329 330 331
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
332
  void *pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
333 334 335 336
  if (pReq == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
337

S
Shengliang Guan 已提交
338 339 340
  tSerializeSDropVnodeReq(pReq, contLen, &dropReq);
  *pContLen = contLen;
  return pReq;
S
Shengliang Guan 已提交
341 342
}

S
Shengliang Guan 已提交
343 344 345 346 347 348 349 350 351 352 353 354
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  pDnode->numOfVnodes = 0;
  return true;
}

static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SDnodeObj *pDnode = pObj;
  SArray    *pArray = p1;

  int64_t curMs = taosGetTimestampMs();
  bool    online = mndIsDnodeOnline(pMnode, pDnode, curMs);
S
Shengliang Guan 已提交
355 356
  bool    isMnode = mndIsMnode(pMnode, pDnode->id);
  pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
S
Shengliang Guan 已提交
357

S
Shengliang Guan 已提交
358
  mDebug("dnode:%d, vnodes:%d support_vnodes:%d is_mnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
S
Shengliang Guan 已提交
359 360 361 362 363 364
         pDnode->numOfSupportVnodes, isMnode, online);

  if (isMnode) {
    pDnode->numOfVnodes++;
  }

S
Shengliang Guan 已提交
365 366 367
  if (online && pDnode->numOfSupportVnodes > 0) {
    taosArrayPush(pArray, pDnode);
  }
S
Shengliang Guan 已提交
368 369 370
  return true;
}

S
Shengliang Guan 已提交
371
SArray *mndBuildDnodesArray(SMnode *pMnode) {
S
Shengliang Guan 已提交
372
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
373
  int32_t numOfDnodes = mndGetDnodeSize(pMnode);
S
Shengliang Guan 已提交
374

S
Shengliang Guan 已提交
375 376 377 378 379
  SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
  if (pArray == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
S
Shengliang Guan 已提交
380

S
Shengliang Guan 已提交
381 382
  sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
  sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
S
Shengliang Guan 已提交
383 384 385 386 387 388
  return pArray;
}

static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
  float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
  float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
S
Shengliang Guan 已提交
389
  return d1Score >= d2Score ? 1 : 0;
S
Shengliang Guan 已提交
390 391 392 393 394 395 396 397 398
}

static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
  SSdb   *pSdb = pMnode->pSdb;
  int32_t allocedVnodes = 0;
  void   *pIter = NULL;

  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);

S
Shengliang Guan 已提交
399 400 401 402 403 404 405 406
  int32_t size = taosArrayGetSize(pArray);
  if (size < pVgroup->replica) {
    mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
           pVgroup->replica);
    terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
    return -1;
  }

S
Shengliang Guan 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj *pDnode = taosArrayGet(pArray, v);
    if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
      return -1;
    }

    pVgid->dnodeId = pDnode->id;
    if (pVgroup->replica == 1) {
      pVgid->role = TAOS_SYNC_STATE_LEADER;
    } else {
      pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
    }

S
Shengliang Guan 已提交
422
    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
S
Shengliang Guan 已提交
423
    pDnode->numOfVnodes++;
S
Shengliang Guan 已提交
424
  }
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426 427
  return 0;
}
428

S
Shengliang Guan 已提交
429
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
S
Shengliang Guan 已提交
430 431 432 433
  int32_t code = -1;
  SArray *pArray = NULL;
  SVgObj *pVgroups = NULL;

wafwerar's avatar
wafwerar 已提交
434
  pVgroups = taosMemoryCalloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
S
Shengliang Guan 已提交
435 436
  if (pVgroups == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
437
    goto _OVER;
S
Shengliang Guan 已提交
438 439
  }

S
Shengliang Guan 已提交
440
  pArray = mndBuildDnodesArray(pMnode);
S
Shengliang Guan 已提交
441
  if (pArray == NULL) goto _OVER;
S
Shengliang Guan 已提交
442

S
Shengliang Guan 已提交
443 444
  mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
        pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
S
Shengliang Guan 已提交
445

S
Shengliang Guan 已提交
446
  int32_t  allocedVgroups = 0;
S
Shengliang Guan 已提交
447 448 449
  int32_t  maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
  uint32_t hashMin = 0;
  uint32_t hashMax = UINT32_MAX;
450
  uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
S
Shengliang Guan 已提交
451

452 453
  if (maxVgId < 2) maxVgId = 2;

454
  for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
S
Shengliang Guan 已提交
455
    SVgObj *pVgroup = &pVgroups[v];
S
Shengliang Guan 已提交
456
    pVgroup->vgId = maxVgId++;
S
Shengliang Guan 已提交
457 458
    pVgroup->createdTime = taosGetTimestampMs();
    pVgroup->updateTime = pVgroups->createdTime;
S
Shengliang Guan 已提交
459
    pVgroup->version = 1;
S
Shengliang Guan 已提交
460
    pVgroup->hashBegin = hashMin + hashInterval * v;
461
    if (v == pDb->cfg.numOfVgroups - 1) {
S
Shengliang Guan 已提交
462 463
      pVgroup->hashEnd = hashMax;
    } else {
S
Shengliang Guan 已提交
464
      pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
S
Shengliang Guan 已提交
465
    }
S
Shengliang Guan 已提交
466

467
    memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
S
Shengliang Guan 已提交
468
    pVgroup->dbUid = pDb->uid;
S
Shengliang Guan 已提交
469
    pVgroup->replica = pDb->cfg.replications;
S
Shengliang Guan 已提交
470

S
Shengliang Guan 已提交
471
    if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) {
S
Shengliang Guan 已提交
472
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
S
Shengliang Guan 已提交
473
      goto _OVER;
S
Shengliang Guan 已提交
474 475
    }

S
Shengliang Guan 已提交
476
    allocedVgroups++;
477 478
  }

S
Shengliang Guan 已提交
479
  *ppVgroups = pVgroups;
S
Shengliang Guan 已提交
480 481
  code = 0;

S
Shengliang Guan 已提交
482
  mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
S
Shengliang Guan 已提交
483

S
Shengliang Guan 已提交
484
_OVER:
wafwerar's avatar
wafwerar 已提交
485
  if (code != 0) taosMemoryFree(pVgroups);
S
Shengliang Guan 已提交
486 487
  taosArrayDestroy(pArray);
  return code;
488 489
}

S
Shengliang Guan 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
    mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
  }

  int32_t maxPos = 1;
  for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
    SDnodeObj *pDnode = taosArrayGet(pArray, d);

    bool used = false;
    for (int32_t vn = 0; vn < maxPos; ++vn) {
      if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
        used = true;
        break;
      }
    }
    if (used) continue;

    if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
      terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
      return -1;
    }

    SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
    pVgid->dnodeId = pDnode->id;
    pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
    pDnode->numOfVnodes++;

    mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
    maxPos++;
    if (maxPos == 3) return 0;
  }

  terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
  return -1;
S
Shengliang Guan 已提交
527 528
}

S
Shengliang Guan 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) {
  int32_t removedNum = 0;

  taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
    SDnodeObj *pDnode = taosArrayGet(pArray, i);
    mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
  }

  for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
    SDnodeObj *pDnode = taosArrayGet(pArray, d);

    for (int32_t vn = 0; vn < TSDB_MAX_REPLICA; ++vn) {
      SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
      if (pVgid->dnodeId == pDnode->id) {
        if (removedNum == 0) *del1 = *pVgid;
        if (removedNum == 1) *del2 = *pVgid;

        mInfo("db:%s, vgId:%d, vn:%d dnode:%d is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
        memset(pVgid, 0, sizeof(SVnodeGid));
        removedNum++;
        pDnode->numOfVnodes--;

        if (removedNum == 2) goto _OVER;
      }
    }
  }

_OVER:
  if (removedNum != 2) return -1;

  for (int32_t vn = 1; vn < TSDB_MAX_REPLICA; ++vn) {
    SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
    if (pVgid->dnodeId != 0) {
      memcpy(&pVgroup->vnodeGid[0], pVgid, sizeof(SVnodeGid));
      memset(pVgid, 0, sizeof(SVnodeGid));
    }
  }

  mInfo("db:%s, vgId:%d, dnode:%d is keeped", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
S
Shengliang Guan 已提交
569 570 571
  return 0;
}

L
Liu Jicong 已提交
572
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
573 574 575
  SEpSet epset = {0};

  for (int32_t v = 0; v < pVgroup->replica; ++v) {
L
Liu Jicong 已提交
576 577
    const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
    SDnodeObj       *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
578 579 580 581 582 583
    if (pDnode == NULL) continue;

    if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
      epset.inUse = epset.numOfEps;
    }

H
Haojun Liao 已提交
584
    addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
585 586 587 588 589 590
    mndReleaseDnode(pMnode, pDnode);
  }

  return epset;
}

S
Shengliang Guan 已提交
591
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
592
  mndTransProcessRsp(pRsp);
593 594 595
  return 0;
}

S
Shengliang Guan 已提交
596
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
597
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
598 599 600
  return 0;
}

S
Shengliang Guan 已提交
601
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp) {
S
Shengliang Guan 已提交
602
  mndTransProcessRsp(pRsp);
S
Shengliang Guan 已提交
603 604 605
  return 0;
}

S
Shengliang Guan 已提交
606
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp) { return 0; }
S
Shengliang Guan 已提交
607

S
Shengliang Guan 已提交
608 609 610 611 612 613 614
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int64_t  uid = *(int64_t *)p1;
  int8_t  *pReplica = p2;
  int32_t *pNumOfVgroups = p3;

  if (pVgroup->dbUid == uid) {
dengyihao's avatar
dengyihao 已提交
615
    *pReplica = TMAX(*pReplica, pVgroup->replica);
S
Shengliang Guan 已提交
616 617 618 619 620 621
    (*pNumOfVgroups)++;
  }

  return true;
}

S
Shengliang Guan 已提交
622
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
S
Shengliang Guan 已提交
623
  SSdb   *pSdb = pMnode->pSdb;
S
Shengliang Guan 已提交
624 625 626 627 628 629
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

S
Shengliang Guan 已提交
630 631 632 633
  *pReplica = 1;
  *pNumOfVgroups = 0;
  sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups);
  mndReleaseDb(pMnode, pDb);
S
Shengliang Guan 已提交
634
  return 0;
S
Shengliang Guan 已提交
635 636
}

S
Shengliang Guan 已提交
637 638
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
639 640 641 642 643
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  int32_t cols = 0;

H
Haojun Liao 已提交
644 645 646 647 648 649 650
  SDbObj *pDb = NULL;
  if (strlen(pShow->db) > 0) {
    pDb = mndAcquireDb(pMnode, pShow->db);
    if (pDb == NULL) {
      return 0;
    }
  }
S
Shengliang Guan 已提交
651

S
Shengliang Guan 已提交
652 653 654 655
  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

H
Haojun Liao 已提交
656 657 658 659 660
    if (pDb != NULL && pVgroup->dbUid != pDb->uid) {
      continue;
    }

    cols = 0;
661 662
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
H
Haojun Liao 已提交
663 664

    SName name = {0};
665 666 667 668
    char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
    tNameGetDbName(&name, varDataVal(db));
    varDataSetLen(db, strlen(varDataVal(db)));
H
Haojun Liao 已提交
669

670 671
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)db, false);
H
Haojun Liao 已提交
672

673 674
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
H
Haojun Liao 已提交
675

676 677
    // default 3 replica
    for (int32_t i = 0; i < 3; ++i) {
678
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
679 680 681 682
      if (i < pVgroup->replica) {
        colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);

        char        buf1[20] = {0};
683 684 685 686 687
        SDnodeObj *pDnodeObj = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
        ASSERT(pDnodeObj != NULL);
        bool isOffLine = !mndIsDnodeOnline(pMnode, pDnodeObj, taosGetTimestampMs());
        const char *role = isOffLine ? "OFFLINE" : syncStr(pVgroup->vnodeGid[i].role);
        
S
Shengliang Guan 已提交
688
        STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
689 690 691 692 693 694 695 696

        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppend(pColInfo, numOfRows, (const char *)buf1, false);
      } else {
        colDataAppendNULL(pColInfo, numOfRows);
        pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
        colDataAppendNULL(pColInfo, numOfRows);
      }
S
Shengliang Guan 已提交
697
    }
S
Shengliang Guan 已提交
698

699 700 701 702 703 704 705 706 707
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppendNULL(pColInfo, numOfRows);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
    colDataAppendNULL(pColInfo, numOfRows);

    pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
    colDataAppendNULL(pColInfo, numOfRows);

H
Haojun Liao 已提交
708
    numOfRows++;
S
Shengliang Guan 已提交
709 710 711
    sdbRelease(pSdb, pVgroup);
  }

H
Haojun Liao 已提交
712 713 714 715
  if (pDb != NULL) {
    mndReleaseDb(pMnode, pDb);
  }

716
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
717 718 719 720 721 722 723 724
  return numOfRows;
}

static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}

S
Shengliang Guan 已提交
725 726 727 728
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
  SVgObj  *pVgroup = pObj;
  int32_t  dnodeId = *(int32_t *)p1;
  int32_t *pNumOfVnodes = (int32_t *)p2;
S
Shengliang Guan 已提交
729

S
Shengliang Guan 已提交
730 731 732
  for (int32_t v = 0; v < pVgroup->replica; ++v) {
    if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
      (*pNumOfVnodes)++;
S
Shengliang Guan 已提交
733
    }
S
Shengliang Guan 已提交
734 735
  }

S
Shengliang Guan 已提交
736 737 738 739 740 741
  return true;
}

int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
  int32_t numOfVnodes = 0;
  sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
S
Shengliang Guan 已提交
742
  return numOfVnodes;
S
Shengliang Guan 已提交
743 744
}

S
Shengliang Guan 已提交
745 746
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
  SMnode *pMnode = pReq->info.node;
S
Shengliang Guan 已提交
747 748 749 750 751 752 753 754 755
  SSdb   *pSdb = pMnode->pSdb;
  int32_t numOfRows = 0;
  SVgObj *pVgroup = NULL;
  int32_t cols = 0;

  while (numOfRows < rows) {
    pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
    if (pShow->pIter == NULL) break;

S
Shengliang Guan 已提交
756
    for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
S
Shengliang Guan 已提交
757 758 759
      SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
      cols = 0;

760 761
      SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
S
Shengliang Guan 已提交
762

H
Haojun Liao 已提交
763
      SName name = {0};
764 765 766 767
      char  db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
      tNameGetDbName(&name, varDataVal(db));
      varDataSetLen(db, strlen(varDataVal(db)));
H
Haojun Liao 已提交
768

769 770
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)db, false);
H
Haojun Liao 已提交
771

772 773 774
      uint32_t val = 0;
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&val, false);
H
Haojun Liao 已提交
775

776
      char buf[20] = {0};
S
Shengliang Guan 已提交
777
      STR_TO_VARSTR(buf, syncStr(pVgid->role));
778 779
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
H
Haojun Liao 已提交
780

781 782
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
      colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);  // onlines
H
Haojun Liao 已提交
783

S
Shengliang Guan 已提交
784 785 786 787 788 789
      numOfRows++;
    }

    sdbRelease(pSdb, pVgroup);
  }

790
  pShow->numOfRows += numOfRows;
S
Shengliang Guan 已提交
791 792 793 794 795 796
  return numOfRows;
}

static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
L
Liu Jicong 已提交
797
}