From 9d5acf18123ebd582c3df09189d2f84ede0bdb11 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 26 Jul 2022 13:26:52 +0800 Subject: [PATCH] enh: add wal options to db --- include/common/tmsg.h | 4 ++++ source/common/src/tmsg.c | 9 +++++++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 7 +++++++ source/dnode/mnode/impl/inc/mndDef.h | 6 +++++- source/dnode/mnode/impl/src/mndDb.c | 20 ++++++++++++++++++++ source/dnode/mnode/impl/src/mndVgroup.c | 4 ++++ 6 files changed, 49 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0d0eb841bc..9d4f77d1ef 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1154,6 +1154,10 @@ typedef struct { int32_t numOfRetensions; SArray* pRetensions; // SRetention void* pTsma; + int32_t walRetentionPeriod; + int32_t walRetentionSize; + int32_t walRollPeriod; + int32_t walSegmentSize; } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f87d336ec2..d4e56ec742 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen)); if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1; + if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; + if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1; + if (tEncodeI32(&encoder, pReq->walSegmentSize) < 0) return -1; tEndEncode(&encoder); @@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1; } + if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->walSegmentSize) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0471e2b850..159b9ca568 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { } pCfg->walCfg.vgId = pCreate->vgId; + pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; + pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod; + pCfg->walCfg.rollPeriod = pCreate->walRetentionSize; + pCfg->walCfg.retentionSize = pCreate->walRollPeriod; + pCfg->walCfg.segSize = pCreate->walSegmentSize; + pCfg->walCfg.level = pCreate->walLevel; + pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; pCfg->hashMethod = pCreate->hashMethod; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c9997fa3d5..e7f78b34c5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -302,9 +302,13 @@ typedef struct { int8_t strict; int8_t hashMethod; // default is 1 int8_t cacheLast; + int8_t schemaless; int32_t numOfRetensions; SArray* pRetensions; - int8_t schemaless; + int32_t walRetentionPeriod; + int32_t walRetentionSize; + int32_t walRollPeriod; + int32_t walSegmentSize; } SDbCfg; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 064ef9b40a..d183b519e8 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER) } SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER) SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { } } SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER) SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) taosInitRWLatch(&pDb->lock); @@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; return -1; } + if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1; + if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1; + if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1; + if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1; terrno = 0; return terrno; @@ -345,6 +357,10 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; + if (pCfg->walRetentionPeriod < 0) pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD; + if (pCfg->walRetentionSize < 0) pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE; + if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD; + if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE; } static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { @@ -457,6 +473,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, .cacheLast = pCreate->cacheLast, .hashMethod = 1, .schemaless = pCreate->schemaless, + .walRetentionPeriod = pCreate->walRetentionPeriod, + .walRetentionSize = pCreate->walRetentionSize, + .walRollPeriod = pCreate->walRollPeriod, + .walSegmentSize = pCreate->walSegmentSize, }; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3eb3a6cd1f..4625b2ab01 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.standby = standby; createReq.isTsma = pVgroup->isTsma; createReq.pTsma = pVgroup->pTsma; + createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod; + createReq.walRetentionSize = pDb->cfg.walRetentionSize; + createReq.walRollPeriod = pDb->cfg.walRollPeriod; + createReq.walSegmentSize = pDb->cfg.walSegmentSize; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v]; -- GitLab