提交 9d5acf18 编写于 作者: S Shengliang Guan

enh: add wal options to db

上级 6f3bf5b7
...@@ -1154,6 +1154,10 @@ typedef struct { ...@@ -1154,6 +1154,10 @@ typedef struct {
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; // SRetention SArray* pRetensions; // SRetention
void* pTsma; void* pTsma;
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SCreateVnodeReq; } SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......
...@@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR ...@@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen)); uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1; if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
} }
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walSegmentSize) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * ...@@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1; if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
} }
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walSegmentSize) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
......
...@@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { ...@@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
} }
pCfg->walCfg.vgId = pCreate->vgId; pCfg->walCfg.vgId = pCreate->vgId;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
pCfg->walCfg.rollPeriod = pCreate->walRetentionSize;
pCfg->walCfg.retentionSize = pCreate->walRollPeriod;
pCfg->walCfg.segSize = pCreate->walSegmentSize;
pCfg->walCfg.level = pCreate->walLevel;
pCfg->hashBegin = pCreate->hashBegin; pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd; pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod; pCfg->hashMethod = pCreate->hashMethod;
......
...@@ -302,9 +302,13 @@ typedef struct { ...@@ -302,9 +302,13 @@ typedef struct {
int8_t strict; int8_t strict;
int8_t hashMethod; // default is 1 int8_t hashMethod; // default is 1
int8_t cacheLast; int8_t cacheLast;
int8_t schemaless;
int32_t numOfRetensions; int32_t numOfRetensions;
SArray* pRetensions; SArray* pRetensions;
int8_t schemaless; int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t walRollPeriod;
int32_t walSegmentSize;
} SDbCfg; } SDbCfg;
typedef struct { typedef struct {
......
...@@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { ...@@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
} }
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
...@@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { ...@@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
} }
} }
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock); taosInitRWLatch(&pDb->lock);
...@@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { ...@@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1; return -1;
} }
if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1;
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
terrno = 0; terrno = 0;
return terrno; return terrno;
...@@ -345,6 +357,10 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -345,6 +357,10 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE; if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
if (pCfg->walRetentionPeriod < 0) pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
if (pCfg->walRetentionSize < 0) pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
} }
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
...@@ -457,6 +473,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, ...@@ -457,6 +473,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.cacheLast = pCreate->cacheLast, .cacheLast = pCreate->cacheLast,
.hashMethod = 1, .hashMethod = 1,
.schemaless = pCreate->schemaless, .schemaless = pCreate->schemaless,
.walRetentionPeriod = pCreate->walRetentionPeriod,
.walRetentionSize = pCreate->walRetentionSize,
.walRollPeriod = pCreate->walRollPeriod,
.walSegmentSize = pCreate->walSegmentSize,
}; };
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......
...@@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg ...@@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.standby = standby; createReq.standby = standby;
createReq.isTsma = pVgroup->isTsma; createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma; createReq.pTsma = pVgroup->pTsma;
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v]; SReplica *pReplica = &createReq.replicas[v];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册