提交 6b6b7d8d 编写于 作者: B Benguang Zhao

enh: alloc disk of vnode primary dir in vmAllocPrimaryDisk

上级 b21e6034
......@@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs);
*/
SDiskSize tfsGetSize(STfs *pTfs);
/**
* @brief Get the number of disks at level of multi-tier storage.
*
* @param pTfs
* @return int32_t
*/
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level);
/**
* @brief Get level of multi-tier storage.
*
......@@ -162,6 +169,16 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
*/
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname);
/**
* @brief Search fname in level of tfs
*
* @param pTfs The fs object.
* @param level The level to search on
* @param fname The relative file name to be searched
* @param int32_t diskId for successs, -1 for failure
*/
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname);
/**
* @brief Init file object in tfs.
*
......
......@@ -15,12 +15,62 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;
int32_t diskId = 0;
if (!pTfs) {
return diskId;
}
// search fs
char vnodePath[TSDB_FILENAME_LEN] = {0};
snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameTmp[TSDB_FILENAME_LEN] = {0};
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
diskId = tfsSearch(pTfs, 0, fname);
if (diskId >= 0) {
return diskId;
}
diskId = tfsSearch(pTfs, 0, fnameTmp);
if (diskId >= 0) {
return diskId;
}
// alloc
return 0;
int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0};
int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t v = 0; v < numOfVnodes; v++) {
SVnodeObj *pVnode = ppVnodes[v];
disks[pVnode->diskPrimary] += 1;
}
int32_t minVal = INT_MAX;
int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
diskId = 0;
for (int32_t id = 0; id < ndisk; id++) {
if (minVal > disks[id]) {
minVal = disks[id];
diskId = id;
}
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
vmReleaseVnode(pMgmt, ppVnodes[i]);
}
if (ppVnodes != NULL) {
taosMemoryFree(ppVnodes);
}
dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
return diskId;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
......
......@@ -93,6 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
......
......@@ -16,8 +16,6 @@
#include "vnd.h"
#include "vnodeInt.h"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
......
......@@ -38,10 +38,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs
}
// create vnode env
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (taosMkDir(dir)) {
if ((pTfs) ? tfsMkdir(pTfs, path) : taosMkDir(path)) {
vError("vgId:%d, failed to mkdir since %s, dir: %s", pCfg->vgId, strerror(errno), path);
return TAOS_SYSTEM_ERROR(errno);
}
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (pCfg) {
info.config = *pCfg;
......@@ -339,6 +340,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->state.applied = info.state.committed;
pVnode->state.applyTerm = info.state.commitTerm;
pVnode->pTfs = pTfs;
pVnode->diskPrimary = diskPrimary;
pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false;
......
......@@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) {
return size;
}
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level) {
if (level < 0 || level >= pTfs->nlevel) {
return 0;
}
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
return pTier->ndisk;
}
int32_t tfsGetLevel(STfs *pTfs) { return pTfs->nlevel; }
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
......@@ -277,7 +286,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDiskID did = {.id = id, .level = level};
if (tfsMkdirAt(pTfs, rname, did) < 0) {
if (tfsMkdirRecurAt(pTfs, rname, did) < 0) {
return -1;
}
}
......@@ -335,6 +344,23 @@ int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) {
return 0;
}
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname) {
if (level < 0 || level >= pTfs->nlevel) {
return -1;
}
char path[TMPNAME_LEN] = {0};
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(path, TMPNAME_LEN - 1, "%s%s%s", pDisk->path, TD_DIRSEP, fname);
if (taosCheckExistFile(path)) {
return id;
}
}
return -1;
}
STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {
STfsDir *pDir = taosMemoryCalloc(1, sizeof(STfsDir));
if (pDir == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册