提交 141fcdd4 编写于 作者: S Shengliang Guan

enh: alter vnode hashrange in cfg file

上级 8bc0d4ba
...@@ -150,7 +150,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname); ...@@ -150,7 +150,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file. * @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname); int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname);
/** /**
* @brief Init file object in tfs. * @brief Init file object in tfs.
......
...@@ -309,9 +309,57 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -309,9 +309,57 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
int32_t srcVgId = req.srcVgId;
int32_t dstVgId = req.dstVgId;
dInfo("vgId:%d, alter hashrange msg will be processed, dstVgId:%d, begin:%u, end:%u", req.srcVgId, req.dstVgId, dInfo("vgId:%d, alter hashrange msg will be processed, dstVgId:%d, begin:%u, end:%u", req.srcVgId, req.dstVgId,
req.hashBegin, req.hashEnd); req.hashBegin, req.hashEnd);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
dInfo("vgId:%d, start to close vnode", srcVgId);
SWrapperCfg wrapperCfg = {
.dropped = pVnode->dropped,
.vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode);
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
dInfo("vgId:%d, start to alter vnode hashrange at %s", srcVgId, srcPath);
if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
return -1;
}
dInfo("vgId:%d, start to open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
}
wrapperCfg.vgId = dstVgId;
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
return -1;
}
if (vnodeStart(pImpl) != 0) {
dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
return -1;
}
dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
return 0; return 0;
} }
...@@ -365,7 +413,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -365,7 +413,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) { if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1; return -1;
} }
......
...@@ -50,7 +50,8 @@ extern const SVnodeCfg vnodeCfgDefault; ...@@ -50,7 +50,8 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads); int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup(); void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
......
...@@ -219,10 +219,10 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { ...@@ -219,10 +219,10 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
if (pPool->node.size != size) { if (pPool->node.size != size) {
SVBufPool *pNewPool = NULL; SVBufPool *pNewPool = NULL;
if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) {
vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", vWarn("vgId:%d, failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s",
TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno));
} else { } else {
vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
pPool->node.size, size); pPool->node.size, size);
vnodeBufPoolDestroy(pPool); vnodeBufPoolDestroy(pPool);
...@@ -232,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { ...@@ -232,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
} }
// add to free list // add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList; pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool; pVnode->freeList = pPool;
...@@ -307,7 +307,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { ...@@ -307,7 +307,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
SVnode *pVnode = pPool->pVnode; SVnode *pVnode = pPool->pVnode;
vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex); taosThreadMutexLock(&pPool->mutex);
......
...@@ -28,10 +28,10 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { ...@@ -28,10 +28,10 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
if (pVnode->onRecycle == NULL) { if (pVnode->onRecycle == NULL) {
if (pVnode->recycleHead == NULL) { if (pVnode->recycleHead == NULL) {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
goto _exit; goto _exit;
} else { } else {
vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id); pVnode->recycleHead->id);
pVnode->onRecycle = pVnode->recycleHead; pVnode->onRecycle = pVnode->recycleHead;
...@@ -50,7 +50,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { ...@@ -50,7 +50,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
_exit: _exit:
if (code) { if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
} }
return code; return code;
} }
...@@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { ...@@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
++nTry; ++nTry;
if (pVnode->freeList) { if (pVnode->freeList) {
vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
pVnode->freeList->id); pVnode->freeList->id);
pVnode->inUse = pVnode->freeList; pVnode->inUse = pVnode->freeList;
...@@ -74,13 +74,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { ...@@ -74,13 +74,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
pVnode->inUse->freeNext = NULL; pVnode->inUse->freeNext = NULL;
break; break;
} else { } else {
vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
code = vnodeTryRecycleBufPool(pVnode); code = vnodeTryRecycleBufPool(pVnode);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pVnode->freeList == NULL) { if (pVnode->freeList == NULL) {
vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
struct timeval tv; struct timeval tv;
struct timespec ts; struct timespec ts;
...@@ -105,7 +105,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { ...@@ -105,7 +105,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
_exit: _exit:
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
if (code) { if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -140,7 +140,7 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -140,7 +140,7 @@ int vnodeBegin(SVnode *pVnode) {
_exit: _exit:
if (code) { if (code) {
terrno = code; terrno = code;
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -351,7 +351,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) { ...@@ -351,7 +351,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
if (nRef == 0) { if (nRef == 0) {
vnodeBufPoolAddToFreeList(pPool); vnodeBufPoolAddToFreeList(pPool);
} else if (nRef > 0) { } else if (nRef > 0) {
vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
if (pVnode->recycleTail == NULL) { if (pVnode->recycleTail == NULL) {
pPool->recyclePrev = pPool->recycleNext = NULL; pPool->recyclePrev = pPool->recycleNext = NULL;
......
...@@ -58,7 +58,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -58,7 +58,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0; return 0;
} }
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0; int32_t ret = 0;
...@@ -107,6 +107,68 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { ...@@ -107,6 +107,68 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
return 0; return 0;
} }
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
if (pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath);
}
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId,
info.config.hashBegin, info.config.hashEnd, pReq->hashBegin, pReq->hashEnd);
info.config.vgId = pReq->dstVgId;
info.config.hashBegin = pReq->hashBegin;
info.config.hashEnd = pReq->hashEnd;
info.config.walCfg.vgId = pReq->dstVgId;
SSyncCfg *pCfg = &info.config.syncCfg;
pCfg->myIndex = 0;
pCfg->replicaNum = 1;
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
SNodeInfo *pNode = &pCfg->nodeInfo[0];
pNode->nodePort = tsServerPort;
tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
return -1;
}
ret = vnodeCommitInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, start to rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = tfsRename(pTfs, srcPath, dstPath);
if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
return 0;
}
void vnodeDestroy(const char *path, STfs *pTfs) { void vnodeDestroy(const char *path, STfs *pTfs) {
vInfo("path:%s is removed while destroy vnode", path); vInfo("path:%s is removed while destroy vnode", path);
tfsRmdir(pTfs, path); tfsRmdir(pTfs, path);
......
...@@ -303,7 +303,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { ...@@ -303,7 +303,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return 0; return 0;
} }
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) { int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) {
char oaname[TMPNAME_LEN] = "\0"; char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册