提交 b21e6034 编写于 作者: B Benguang Zhao

enh: add the field diskPrimary to vnodeGetPrimaryDir, and entries of vnodes.json

上级 021fcf2a
...@@ -46,6 +46,7 @@ typedef struct { ...@@ -46,6 +46,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t vgVersion; int32_t vgVersion;
int8_t dropped; int8_t dropped;
int32_t diskPrimary;
int32_t toVgId; int32_t toVgId;
char path[PATH_MAX + 20]; char path[PATH_MAX + 20];
} SWrapperCfg; } SWrapperCfg;
...@@ -56,6 +57,7 @@ typedef struct { ...@@ -56,6 +57,7 @@ typedef struct {
int32_t refCount; int32_t refCount;
int8_t dropped; int8_t dropped;
int8_t disable; int8_t disable;
int32_t diskPrimary;
int32_t toVgId; int32_t toVgId;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
...@@ -81,6 +83,7 @@ typedef struct { ...@@ -81,6 +83,7 @@ typedef struct {
} SVnodeThread; } SVnodeThread;
// vmInt.c // vmInt.c
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
......
...@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg ** ...@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if (code < 0) goto _OVER; if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code); tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
if (code < 0) goto _OVER; if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code);
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code); tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
if (code < 0) goto _OVER; if (code < 0) goto _OVER;
...@@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num ...@@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1;
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1; if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
} }
......
...@@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
wrapperCfg.diskPrimary = diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req); tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr()); dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno; code = terrno;
...@@ -399,6 +402,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -399,6 +402,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
dInfo("vgId:%d, start to close vnode", vgId); dInfo("vgId:%d, start to close vnode", vgId);
int32_t diskPrimary = pVnode->diskPrimary;
SWrapperCfg wrapperCfg = { SWrapperCfg wrapperCfg = {
.dropped = pVnode->dropped, .dropped = pVnode->dropped,
.vgId = pVnode->vgId, .vgId = pVnode->vgId,
...@@ -411,13 +415,13 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -411,13 +415,13 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &req, pMgmt->pTfs) < 0) { if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1; return -1;
} }
dInfo("vgId:%d, begin to open vnode", vgId); dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1; return -1;
...@@ -486,10 +490,12 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -486,10 +490,12 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
int32_t diskPrimary = pVnode->diskPrimary;
SWrapperCfg wrapperCfg = { SWrapperCfg wrapperCfg = {
.dropped = pVnode->dropped, .dropped = pVnode->dropped,
.vgId = dstVgId, .vgId = dstVgId,
.vgVersion = pVnode->vgVersion, .vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
}; };
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
...@@ -509,13 +515,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -509,13 +515,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath); dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) { if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr()); dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
return -1; return -1;
} }
dInfo("vgId:%d, open vnode", dstVgId); dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1; return -1;
...@@ -598,10 +604,12 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -598,10 +604,12 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
dInfo("vgId:%d, start to close vnode", vgId); dInfo("vgId:%d, start to close vnode", vgId);
int32_t diskPrimary = pVnode->diskPrimary;
SWrapperCfg wrapperCfg = { SWrapperCfg wrapperCfg = {
.dropped = pVnode->dropped, .dropped = pVnode->dropped,
.vgId = pVnode->vgId, .vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion, .vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
}; };
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode, false); vmCloseVnode(pMgmt, pVnode, false);
...@@ -610,13 +618,13 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -610,13 +618,13 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) { if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1; return -1;
} }
dInfo("vgId:%d, begin to open vnode", vgId); dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1; return -1;
......
...@@ -17,6 +17,12 @@ ...@@ -17,6 +17,12 @@
#include "vmInt.h" #include "vmInt.h"
#include "vnd.h" #include "vnd.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
// search fs
// alloc
return 0;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
...@@ -52,6 +58,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -52,6 +58,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->vgId = pCfg->vgId; pVnode->vgId = pCfg->vgId;
pVnode->vgVersion = pCfg->vgVersion; pVnode->vgVersion = pCfg->vgVersion;
pVnode->diskPrimary = pCfg->diskPrimary;
pVnode->refCount = 0; pVnode->refCount = 0;
pVnode->dropped = 0; pVnode->dropped = 0;
pVnode->path = taosStrdup(pCfg->path); pVnode->path = taosStrdup(pCfg->path);
...@@ -169,7 +176,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) { ...@@ -169,7 +176,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs); int32_t diskPrimary = pCfg->diskPrimary;
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
if (vgId <= 0) { if (vgId <= 0) {
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath); dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
return -1; return -1;
...@@ -205,9 +213,10 @@ static void *vmOpenVnodeInThread(void *param) { ...@@ -205,9 +213,10 @@ static void *vmOpenVnodeInThread(void *param) {
pThread->updateVnodesList = true; pThread->updateVnodesList = true;
} }
int32_t diskPrimary = pCfg->diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
......
...@@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault; ...@@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads); int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup(); void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs); int32_t diskPrimary, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode);
......
...@@ -87,7 +87,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool); ...@@ -87,7 +87,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t vnodeBufPoolRecycle(SVBufPool* pPool); int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
// vnodeOpen.c // vnodeOpen.c
int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen); int32_t vnodeGetPrimaryDir(const char* relPath, int32_t diskPrimary, STfs* pTfs, char* buf, size_t bufLen);
// vnodeQuery.c // vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode); int32_t vnodeQueryOpen(SVnode* pVnode);
......
...@@ -385,6 +385,7 @@ struct SVnode { ...@@ -385,6 +385,7 @@ struct SVnode {
SVState state; SVState state;
SVStatis statis; SVStatis statis;
STfs* pTfs; STfs* pTfs;
int32_t diskPrimary;
SMsgCb msgCb; SMsgCb msgCb;
// Buffer Pool // Buffer Pool
......
...@@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { ...@@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
*ppMeta = NULL; *ppMeta = NULL;
// create handle // create handle
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
offset = strlen(path); offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
......
...@@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN ...@@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
int32_t offset = 0; int32_t offset = 0;
// vnode // vnode
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName); offset = strlen(outputName);
// rsma // rsma
......
...@@ -59,7 +59,7 @@ typedef struct { ...@@ -59,7 +59,7 @@ typedef struct {
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
int32_t offset = strlen(path); int32_t offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP); snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);
......
...@@ -276,14 +276,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { ...@@ -276,14 +276,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
// CURRENT // CURRENT
if (current) { if (current) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current); offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP); snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP);
} }
// CURRENT.t // CURRENT.t
if (current_t) { if (current_t) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t); offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP); snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP);
} }
......
...@@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile =============================================== // SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) { void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
int32_t offset = 0; int32_t offset = 0;
SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN);
offset = strlen(fname); offset = strlen(fname);
snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP, snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP,
TD_VID(pTsdb->pVnode), pFile->commitID); TD_VID(pTsdb->pVnode), pFile->commitID);
......
...@@ -290,7 +290,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { ...@@ -290,7 +290,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->txn = metaGetTxn(pVnode->pMeta); pInfo->txn = metaGetTxn(pVnode->pMeta);
// save info // save info
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) { if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
...@@ -428,7 +428,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -428,7 +428,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return -1; return -1;
} }
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed); syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
...@@ -492,7 +492,7 @@ bool vnodeShouldRollback(SVnode *pVnode) { ...@@ -492,7 +492,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0}; char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0; int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName); offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
...@@ -503,7 +503,7 @@ void vnodeRollback(SVnode *pVnode) { ...@@ -503,7 +503,7 @@ void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0}; char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0; int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName); offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
......
...@@ -15,9 +15,11 @@ ...@@ -15,9 +15,11 @@
#include "vnd.h" #include "vnd.h"
int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) { int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) { if (pTfs) {
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath); SDiskID diskId = {0};
diskId.id = diskPrimary;
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
} else { } else {
snprintf(buf, bufLen - 1, "%s", relPath); snprintf(buf, bufLen - 1, "%s", relPath);
} }
...@@ -25,7 +27,7 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu ...@@ -25,7 +27,7 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu
return 0; return 0;
} }
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
...@@ -36,7 +38,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -36,7 +38,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
} }
// create vnode env // create vnode env
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (taosMkDir(dir)) { if (taosMkDir(dir)) {
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
...@@ -60,12 +62,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -60,12 +62,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0; return 0;
} }
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0; int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info); ret = vnodeLoadInfo(dir, &info);
if (ret < 0) { if (ret < 0) {
...@@ -133,7 +135,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) { ...@@ -133,7 +135,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
return strlen(tmp); return strlen(tmp);
} }
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
int32_t ret = 0; int32_t ret = 0;
char oldRname[TSDB_FILENAME_LEN] = {0}; char oldRname[TSDB_FILENAME_LEN] = {0};
...@@ -183,12 +186,13 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr ...@@ -183,12 +186,13 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
return ret; return ret;
} }
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0; int32_t ret = 0;
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info); ret = vnodeLoadInfo(dir, &info);
if (ret < 0) { if (ret < 0) {
...@@ -232,7 +236,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod ...@@ -232,7 +236,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
} }
vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath); vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs); ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
if (ret < 0) { if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath, vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
tstrerror(terrno)); tstrerror(terrno));
...@@ -243,11 +247,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod ...@@ -243,11 +247,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return 0; return 0;
} }
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) == 0) { if (vnodeLoadInfo(dir, &info) == 0) {
if (info.config.vgId != dstVgId) { if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId); vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
...@@ -256,7 +261,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s ...@@ -256,7 +261,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return dstVgId; return dstVgId;
} }
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) < 0) { if (vnodeLoadInfo(dir, &info) < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno)); vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
return -1; return -1;
...@@ -271,7 +276,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s ...@@ -271,7 +276,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
} }
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath); vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) { if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno)); vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
return -1; return -1;
} }
...@@ -284,14 +289,14 @@ void vnodeDestroy(const char *path, STfs *pTfs) { ...@@ -284,14 +289,14 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir(pTfs, path); tfsRmdir(pTfs, path);
} }
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
char tdir[TSDB_FILENAME_LEN * 2] = {0}; char tdir[TSDB_FILENAME_LEN * 2] = {0};
int32_t ret = 0; int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
info.config = vnodeCfgDefault; info.config = vnodeCfgDefault;
......
...@@ -35,7 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) { ...@@ -35,7 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
pInfo->commitID = ++pVnode->state.commitID; pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &pInfo->info) < 0) { if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno; code = terrno;
...@@ -60,7 +60,7 @@ static int32_t vnodeRetentionTask(void *param) { ...@@ -60,7 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
// save info // save info
pInfo->info.state.commitID = pInfo->commitID; pInfo->info.state.commitID = pInfo->commitID;
......
...@@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { ...@@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pReader->pVnode;
// CONFIG ============== // CONFIG ==============
// FIXME: if commit multiple times and the config changed? // FIXME: if commit multiple times and the config changed?
...@@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
char fName[TSDB_FILENAME_LEN]; char fName[TSDB_FILENAME_LEN];
int32_t offset = 0; int32_t offset = 0;
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
offset = strlen(fName); offset = strlen(fName);
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME); snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
...@@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * ...@@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.applyTerm = pWriter->info.state.commitTerm}; .applyTerm = pWriter->info.state.commitTerm};
pVnode->statis = pWriter->info.statis; pVnode->statis = pWriter->info.statis;
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeCommitInfo(dir); vnodeCommitInfo(dir);
} else { } else {
...@@ -381,7 +382,7 @@ _exit: ...@@ -381,7 +382,7 @@ _exit:
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pWriter->pVnode;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
// decode info // decode info
...@@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_ ...@@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed // modify info as needed
char dir[TSDB_FILENAME_LEN] = {0}; char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
SVnodeStats vndStats = pWriter->info.config.vndStats; SVnodeStats vndStats = pWriter->info.config.vndStats;
SVnode *pVnode = pWriter->pVnode;
pWriter->info.config = pVnode->config; pWriter->info.config = pVnode->config;
pWriter->info.config.vndStats = vndStats; pWriter->info.config.vndStats = vndStats;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId); vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册