提交 89f4a7ad 编写于 作者: S Shengliang Guan

TD-10431 change numOfVgroups as config of db

上级 aa5f1c72
......@@ -235,6 +235,7 @@ do { \
#define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096
#define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1
......
......@@ -182,6 +182,7 @@ typedef struct {
} SUserObj;
typedef struct {
int32_t numOfVgroups;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
......@@ -209,7 +210,6 @@ typedef struct {
int64_t uid;
int32_t cfgVersion;
int32_t vgVersion;
int32_t numOfVgroups;
int8_t hashMethod; // default is 1
SDbCfg cfg;
} SDbObj;
......
......@@ -77,8 +77,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT64(pRaw, dataPos, pDb->uid)
SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->numOfVgroups)
SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfVgroups)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile)
......@@ -124,8 +124,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->numOfVgroups)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.numOfVgroups)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile)
......@@ -163,7 +163,6 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
pOldDb->updateTime = pNewDb->createdTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
pOldDb->numOfVgroups = pNewDb->numOfVgroups;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
return 0;
}
......@@ -195,6 +194,7 @@ static int32_t mndCheckDbName(char *dbName, SUserObj *pUser) {
}
static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pCfg->numOfVgroups > TSDB_MAX_VNODES_PER_DB) return -1;
if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) return -1;
if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) return -1;
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
......@@ -222,6 +222,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
}
static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->numOfVgroups < 0) pCfg->numOfVgroups = TSDB_DEFAULT_VN_PER_DB;
if (pCfg->cacheBlockSize < 0) pCfg->cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
if (pCfg->totalBlocks < 0) pCfg->totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
if (pCfg->daysPerFile < 0) pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
......@@ -246,7 +247,7 @@ static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING);
......@@ -260,7 +261,7 @@ static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
......@@ -274,7 +275,7 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_READY);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
......@@ -298,11 +299,11 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
dbObj.createdTime = taosGetTimestampMs();
dbObj.updateTime = dbObj.createdTime;
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
dbObj.numOfVgroups = pCreate->numOfVgroups;
dbObj.hashMethod = 1;
dbObj.cfgVersion = 1;
dbObj.vgVersion = 1;
dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize,
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups,
.cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks,
.daysPerFile = pCreate->daysPerFile,
.daysToKeep0 = pCreate->daysToKeep0,
......@@ -643,7 +644,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->numOfVgroups * sizeof(SVgroupInfo);
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -654,7 +655,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
if (pUse->vgVersion < pDb->vgVersion) {
void *pIter = NULL;
while (vindex < pDb->numOfVgroups) {
while (vindex < pDb->cfg.numOfVgroups) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
......@@ -888,7 +889,7 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->numOfVgroups;
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -156,11 +156,6 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
static int32_t mndGetDefaultVgroupSize(SMnode *pMnode) {
// todo
return 2;
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0;
......@@ -193,21 +188,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
}
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if (pDb->numOfVgroups != -1 &&
(pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB)) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
return -1;
}
if (pDb->numOfVgroups == -1) {
pDb->numOfVgroups = mndGetDefaultVgroupSize(pMnode);
if (pDb->numOfVgroups < 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
}
SVgObj *pVgroups = calloc(pDb->numOfVgroups, sizeof(SVgObj));
SVgObj *pVgroups = calloc(pDb->cfg.numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -217,9 +198,9 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
uint32_t hashMin = 0;
uint32_t hashMax = UINT32_MAX;
uint32_t hashInterval = (hashMax - hashMin) / pDb->numOfVgroups;
uint32_t hashInterval = (hashMax - hashMin) / pDb->cfg.numOfVgroups;
for (uint32_t v = 0; v < pDb->numOfVgroups; v++) {
for (uint32_t v = 0; v < pDb->cfg.numOfVgroups; v++) {
SVgObj *pVgroup = &pVgroups[v];
pVgroup->vgId = maxVgId++;
pVgroup->createdTime = taosGetTimestampMs();
......@@ -227,7 +208,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroup->version = 1;
pVgroup->dbUid = pDb->uid;
pVgroup->hashBegin = hashMin + hashInterval * v;
if (v == pDb->numOfVgroups - 1) {
if (v == pDb->cfg.numOfVgroups - 1) {
pVgroup->hashEnd = hashMax;
} else {
pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
......
# generate debug version:
# mkdir debug; cd debug; cmake -DCMAKE_BUILD_TYPE=Debug ..
# generate release version:
# mkdir release; cd release; cmake -DCMAKE_BUILD_TYPE=Release ..
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
SET(CMAKE_C_STANDARD 11)
SET(CMAKE_VERBOSE_MAKEFILE ON)
ADD_SUBDIRECTORY(examples/c)
#ADD_SUBDIRECTORY(examples/c)
ADD_SUBDIRECTORY(tsim)
ADD_SUBDIRECTORY(test/c)
ADD_SUBDIRECTORY(comparisonTest/tdengine)
#ADD_SUBDIRECTORY(test/c)
#ADD_SUBDIRECTORY(comparisonTest/tdengine)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_EXECUTABLE(tsim ${SRC})
TARGET_LINK_LIBRARIES(tsim taos_static trpc tutil pthread cJson)
aux_source_directory(src TSIM_SRC)
add_executable(tsim ${TSIM_SRC})
target_link_libraries(
tsim
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
PUBLIC cJson
)
target_include_directories(
tsim
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -219,7 +219,9 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (arguments->abort) {
#ifndef _ALPINE
#if 0
error(10, 0, "ABORTED");
#endif
#else
abort();
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册