From 8bc0d4ba74a3fa8da3b1a8463610a0a986bcfc35 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 Jan 2023 16:33:17 +0800 Subject: [PATCH] enh: add alter vnode hashrange request --- include/common/tmsg.h | 11 ++++ source/common/src/tmsg.c | 34 +++++++++++ source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 3 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 17 +++++- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 5 +- source/dnode/mnode/impl/src/mndVgroup.c | 62 ++++++++++++++++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 ----- 7 files changed, 119 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b820a7ee3a..c0e8cbae49 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1287,6 +1287,17 @@ typedef struct { int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq); int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq); +typedef struct { + int32_t srcVgId; + int32_t dstVgId; + uint32_t hashBegin; + uint32_t hashEnd; + int64_t reserved; +} SAlterVnodeHashRangeReq; + +int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); +int32_t tDeserializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); + typedef struct { SMsgHead header; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a8ce16a0d7..322143b71f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4146,6 +4146,40 @@ int32_t tDeserializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVno return 0; } +int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->srcVgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1; + if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1; + if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->srcVgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 2453743df2..cd49997d49 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -87,8 +87,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index db257646ba..cba2c99701 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -302,7 +302,20 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SAlterVnodeHashRangeReq req = {0}; + if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + dInfo("vgId:%d, alter hashrange msg will be processed, dstVgId:%d, begin:%u, end:%u", req.srcVgId, req.dstVgId, + req.hashBegin, req.hashEnd); + + return 0; +} + +int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeReplicaReq alterReq = {0}; if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -473,7 +486,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ae68398fbb..600dd3671d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -41,11 +41,14 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = vmProcessDropVnodeReq(pMgmt, pMsg); break; case TDMT_VND_ALTER_REPLICA: - code = vmProcessAlterVnodeReq(pMgmt, pMsg); + code = vmProcessAlterVnodeReplicaReq(pMgmt, pMsg); break; case TDMT_VND_DISABLE_WRITE: code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg); break; + case TDMT_VND_ALTER_HASHRANGE: + code = vmProcessAlterHashRangeReq(pMgmt, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 703eff9a73..4aed049a22 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -420,6 +420,33 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v return pReq; } +static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) { + SAlterVnodeHashRangeReq alterReq = { + .srcVgId = pVgroup->vgId, + .dstVgId = dstVgId, + .hashBegin = pVgroup->hashBegin, + .hashEnd = pVgroup->hashEnd, + }; + + mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, begin:%u, end:%u", pVgroup->vgId, dstVgId, + pVgroup->hashBegin, pVgroup->hashEnd); + int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq); + *pContLen = contLen; + return pReq; +} + void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; @@ -1077,7 +1104,25 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD return 0; } -int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; } +static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_ALTER_HASHRANGE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { STransAction action = {0}; @@ -1854,12 +1899,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId); } - pRaw = mndVgroupActionEncode(pVgroup); - if (pRaw == NULL) goto _OVER; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; - (void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING); - pRaw = NULL; - SVgObj newVg2 = {0}; memcpy(&newVg2, &newVg1, sizeof(SVgObj)); newVg1.replica = 1; @@ -1882,8 +1921,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId); } - if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; - if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER; + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER; + newVg1.vgId = maxVgId; + + maxVgId++; + if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER; + newVg2.vgId = maxVgId; // adjust vgroup replica if (pDb->cfg.replications != newVg1.replica) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6abc144b91..313b9da9a5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -24,7 +24,6 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -313,9 +312,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_ALTER_CONFIRM: vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); break; - case TDMT_VND_ALTER_HASHRANGE: - vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp); - break; case TDMT_VND_ALTER_CONFIG: vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); break; @@ -1246,16 +1242,6 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void return 0; } -static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode)); - - // todo - // 1. stop work - // 2. adjust hash range / compact / remove wals / rename vgroups - // 3. reload sync - return 0; -} - static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { bool walChanged = false; bool tsdbChanged = false; -- GitLab