From 141fcdd49b5899f81fb96320146e42d05922c26c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 Jan 2023 17:46:42 +0800 Subject: [PATCH] enh: alter vnode hashrange in cfg file --- include/libs/tfs/tfs.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 50 +++++++++++++++- source/dnode/vnode/inc/vnode.h | 3 +- source/dnode/vnode/src/vnd/vnodeBufPool.c | 8 +-- source/dnode/vnode/src/vnd/vnodeCommit.c | 18 +++--- source/dnode/vnode/src/vnd/vnodeOpen.c | 64 ++++++++++++++++++++- source/libs/tfs/src/tfs.c | 2 +- 7 files changed, 129 insertions(+), 18 deletions(-) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 3af75e0eaf..cbf1d60e35 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -150,7 +150,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname); * @param nrname The rel name of new file. * @return int32_t 0 for success, -1 for failure. */ -int32_t tfsRename(STfs *pTfs, char *orname, char *nrname); +int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname); /** * @brief Init file object in tfs. diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index cba2c99701..e110130909 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -309,9 +309,57 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } + int32_t srcVgId = req.srcVgId; + int32_t dstVgId = req.dstVgId; dInfo("vgId:%d, alter hashrange msg will be processed, dstVgId:%d, begin:%u, end:%u", req.srcVgId, req.dstVgId, req.hashBegin, req.hashEnd); + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + dInfo("vgId:%d, start to close vnode", srcVgId); + SWrapperCfg wrapperCfg = { + .dropped = pVnode->dropped, + .vgId = pVnode->vgId, + .vgVersion = pVnode->vgVersion, + }; + tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); + vmCloseVnode(pMgmt, pVnode); + + char srcPath[TSDB_FILENAME_LEN] = {0}; + char dstPath[TSDB_FILENAME_LEN] = {0}; + snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); + snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); + + dInfo("vgId:%d, start to alter vnode hashrange at %s", srcVgId, srcPath); + if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) { + dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr()); + return -1; + } + + dInfo("vgId:%d, start to open vnode", dstVgId); + SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb); + if (pImpl == NULL) { + dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); + return -1; + } + + wrapperCfg.vgId = dstVgId; + if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) { + dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr()); + return -1; + } + + if (vnodeStart(pImpl) != 0) { + dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr()); + return -1; + } + + dInfo("vgId:%d, vnode hashrange is altered", dstVgId); return 0; } @@ -365,7 +413,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); - if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) { + if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); return -1; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7d368c76c5..fdb778e2bb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -50,7 +50,8 @@ extern const SVnodeCfg vnodeCfgDefault; int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); -int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index b1575fb496..40112c5579 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -219,10 +219,10 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { if (pPool->node.size != size) { SVBufPool *pNewPool = NULL; if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { - vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + vWarn("vgId:%d, failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); } else { - vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, pPool->node.size, size); vnodeBufPoolDestroy(pPool); @@ -232,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { } // add to free list - vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); vnodeBufPoolReset(pPool); pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; @@ -307,7 +307,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { SVnode *pVnode = pPool->pVnode; - vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); taosThreadMutexLock(&pPool->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index aeb90b7080..a3401926e6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -28,10 +28,10 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { if (pVnode->onRecycle == NULL) { if (pVnode->recycleHead == NULL) { - vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode)); goto _exit; } else { - vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, pVnode->recycleHead->id); pVnode->onRecycle = pVnode->recycleHead; @@ -50,7 +50,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { _exit: if (code) { - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); + vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); } return code; } @@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { ++nTry; if (pVnode->freeList) { - vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, + vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, pVnode->freeList->id); pVnode->inUse = pVnode->freeList; @@ -74,13 +74,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { pVnode->inUse->freeNext = NULL; break; } else { - vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); + vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); code = vnodeTryRecycleBufPool(pVnode); TSDB_CHECK_CODE(code, lino, _exit); if (pVnode->freeList == NULL) { - vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); + vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); struct timeval tv; struct timespec ts; @@ -105,7 +105,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { _exit: taosThreadMutexUnlock(&pVnode->mutex); if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -140,7 +140,7 @@ int vnodeBegin(SVnode *pVnode) { _exit: if (code) { terrno = code; - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -351,7 +351,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) { if (nRef == 0) { vnodeBufPoolAddToFreeList(pPool); } else if (nRef > 0) { - vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); if (pVnode->recycleTail == NULL) { pPool->recyclePrev = pPool->recycleNext = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 61cb75b1da..4acaadfa20 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -58,7 +58,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return 0; } -int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; int32_t ret = 0; @@ -107,6 +107,68 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { return 0; } +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN] = {0}; + int32_t ret = 0; + + if (pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath); + } + + ret = vnodeLoadInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, + info.config.hashBegin, info.config.hashEnd, pReq->hashBegin, pReq->hashEnd); + info.config.vgId = pReq->dstVgId; + info.config.hashBegin = pReq->hashBegin; + info.config.hashEnd = pReq->hashEnd; + info.config.walCfg.vgId = pReq->dstVgId; + + SSyncCfg *pCfg = &info.config.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 1; + memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); + + vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId); + SNodeInfo *pNode = &pCfg->nodeInfo[0]; + pNode->nodePort = tsServerPort; + tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); + + info.config.syncCfg = *pCfg; + + ret = vnodeSaveInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno)); + return -1; + } + + ret = vnodeCommitInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, start to rename %s to %s", pReq->dstVgId, srcPath, dstPath); + ret = tfsRename(pTfs, srcPath, dstPath); + if (ret < 0) { + vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath, + tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId); + return 0; +} + void vnodeDestroy(const char *path, STfs *pTfs) { vInfo("path:%s is removed while destroy vnode", path); tfsRmdir(pTfs, path); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 9a71fc7f30..d73b65c3b5 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -303,7 +303,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { return 0; } -int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) { +int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; -- GitLab