diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 17851bc4a87c585224777bf88a18c83d853f48ff..842c8a44e17abb55b1796cca5c7090cab89fbadc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -219,6 +219,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0391) #define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392) #define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393) +#define TSDB_CODE_MND_HAS_OFFLINE_DNODE TAOS_DEF_ERROR_CODE(0, 0x0394) // mnode-stable #define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index d8f2dae2343f224b53ab4d1d994808517fc4c5e4..5244bc657b6f6d56ef38b2dfb7fb5dd8f49eaaf4 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -38,6 +38,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); +static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -1165,4 +1166,159 @@ _OVER: return code; } -static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; } \ No newline at end of file +static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; } + +static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, + SDnodeObj *pSrc, SDnodeObj *pDst) { + SVgObj newVg = {0}; + memcpy(&newVg, pVgroup, sizeof(SVgObj)); + mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica); + for (int32_t i = 0; i < newVg.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); + } + + if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id) != 0) return -1; + if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id) != 0) return -1; + + SSdbRaw *pRaw = mndVgroupActionEncode(&newVg); + if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) { + sdbFreeRaw(pRaw); + return -1; + } + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica); + for (int32_t i = 0; i < newVg.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); + } + return 0; +} + +static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst) { + void *pIter = NULL; + int32_t code = -1; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + bool existInSrc = false; + bool existInDst = false; + for (int32_t i = 0; i < pVgroup->replica; ++i) { + SVnodeGid *pGid = &pVgroup->vnodeGid[i]; + if (pGid->dnodeId == pSrc->id) existInSrc = true; + if (pGid->dnodeId == pDst->id) existInDst = true; + } + + if (!existInSrc || existInDst) { + sdbRelease(pMnode->pSdb, pVgroup); + } + + SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); + code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst); + mndReleaseDb(pMnode, pDb); + sdbRelease(pMnode->pSdb, pVgroup); + break; + } + + return code; +} + +static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) { + int32_t code = -1; + int32_t numOfVgroups = 0; + STrans *pTrans = NULL; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq); + if (pTrans == NULL) goto _OVER; + mndTransSetSerial(pTrans); + mDebug("trans:%d, used to balance vgroup", pTrans->id); + + while (1) { + taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + SDnodeObj *pSrc = taosArrayGet(pArray, 0); + SDnodeObj *pDst = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1); + + float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes; + float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes; + if (srcScore + 0.0001 < dstScore) { + mDebug("trans:%d, balance vgroup from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id); + code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst); + if (code == 0) { + numOfVgroups++; + continue; + } else { + mError("trans:%d, failed to balance vgroup from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id); + return -1; + } + } else { + mDebug("trans:%d, no vgroup need to balance vgroup any more", pTrans->id); + break; + } + } + + if (numOfVgroups <= 0) { + mDebug("no need to balance vgroup"); + code = 0; + } else { + mDebug("start to balance vgroup, numOfVgroups:%d", numOfVgroups); + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + +_OVER: + mndTransDrop(pTrans); + return code; +} + +static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SUserObj *pUser = NULL; + SArray *pArray = NULL; + void *pIter = NULL; + int64_t curMs = taosGetTimestampMs(); + + mDebug("start to balance vgroup"); + pUser = mndAcquireUser(pMnode, pReq->conn.user); + if (pUser == NULL) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + goto _OVER; + } + + if (mndCheckNodeAuth(pUser) != 0) goto _OVER; + + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + if (!mndIsDnodeOnline(pDnode, curMs)) { + terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE; + mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id); + sdbRelease(pMnode->pSdb, pDnode); + goto _OVER; + } + + sdbRelease(pMnode->pSdb, pDnode); + } + + pArray = mndBuildDnodesArray(pMnode, 0); + if (pArray == NULL) goto _OVER; + + if (taosArrayGetSize(pArray) < 2) { + mDebug("no need to balance vgroup since dnode num less than 2"); + code = 0; + } else { + code = mndBalanceVgroup(pMnode, pReq, pArray); + } + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to balance vgroup since %s", terrstr()); + } + + mndReleaseUser(pMnode, pUser); + taosArrayDestroy(pArray); + return code; +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8d7bc0851b355aad8221cd2c2d97097e234af039..c14c279d8151d80a5009f70a1c3ca0b1142e794f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -222,7 +222,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist") // mnode-vgroup TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "Vgroup does not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_UN_CHANGED, "Vgroup distribution has not changed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_HAS_OFFLINE_DNODE, "Offline dnode exists") // mnode-stable TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")