未验证 提交 a5d8549c 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20593 from taosdata/FEAT/TD-23258-3.0

feat: support WAL_RETENTION_PERIOD and WAL_RETENTION_SIZE for alter database
...@@ -1322,6 +1322,9 @@ typedef struct { ...@@ -1322,6 +1322,9 @@ typedef struct {
// 1st modification // 1st modification
int16_t sttTrigger; int16_t sttTrigger;
int32_t minRows; int32_t minRows;
// 2nd modification
int32_t walRetentionPeriod;
int32_t walRetentionSize;
} SAlterVnodeConfigReq; } SAlterVnodeConfigReq;
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq); int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
......
...@@ -2219,12 +2219,12 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { ...@@ -2219,12 +2219,12 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replications) < 0) return -1; if (tEncodeI8(&encoder, pReq->replications) < 0) return -1;
if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1; if (tEncodeI32(&encoder, pReq->sstTrigger) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
// 1st modification // 1st modification
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1; if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
// 2nd modification
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -2252,13 +2252,6 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { ...@@ -2252,13 +2252,6 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replications) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1; if (tDecodeI32(&decoder, &pReq->sstTrigger) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
} else {
pReq->walRetentionPeriod = -1;
pReq->walRetentionSize = -1;
}
// 1st modification // 1st modification
if (!tDecodeIsEnd(&decoder)) { if (!tDecodeIsEnd(&decoder)) {
...@@ -2266,6 +2259,15 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { ...@@ -2266,6 +2259,15 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
} else { } else {
pReq->minRows = -1; pReq->minRows = -1;
} }
// 2nd modification
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
} else {
pReq->walRetentionPeriod = -1;
pReq->walRetentionSize = -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4196,7 +4198,9 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon ...@@ -4196,7 +4198,9 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon
// 1st modification // 1st modification
if (tEncodeI16(&encoder, pReq->sttTrigger) < 0) return -1; if (tEncodeI16(&encoder, pReq->sttTrigger) < 0) return -1;
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1; if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
// 2nd modification
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -4235,6 +4239,14 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC ...@@ -4235,6 +4239,14 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1; if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1;
} }
// 2n modification
if (tDecodeIsEnd(&decoder)) {
pReq->walRetentionPeriod = -1;
pReq->walRetentionSize = -1;
} else {
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
......
...@@ -275,6 +275,8 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { ...@@ -275,6 +275,8 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2; pOld->cfg.daysToKeep2 = pNew->cfg.daysToKeep2;
pOld->cfg.walFsyncPeriod = pNew->cfg.walFsyncPeriod; pOld->cfg.walFsyncPeriod = pNew->cfg.walFsyncPeriod;
pOld->cfg.walLevel = pNew->cfg.walLevel; pOld->cfg.walLevel = pNew->cfg.walLevel;
pOld->cfg.walRetentionPeriod = pNew->cfg.walRetentionPeriod;
pOld->cfg.walRetentionSize = pNew->cfg.walRetentionSize;
pOld->cfg.strict = pNew->cfg.strict; pOld->cfg.strict = pNew->cfg.strict;
pOld->cfg.cacheLast = pNew->cfg.cacheLast; pOld->cfg.cacheLast = pNew->cfg.cacheLast;
pOld->cfg.replications = pNew->cfg.replications; pOld->cfg.replications = pNew->cfg.replications;
......
...@@ -325,6 +325,8 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV ...@@ -325,6 +325,8 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV
alterReq.cacheLast = pDb->cfg.cacheLast; alterReq.cacheLast = pDb->cfg.cacheLast;
alterReq.sttTrigger = pDb->cfg.sstTrigger; alterReq.sttTrigger = pDb->cfg.sstTrigger;
alterReq.minRows = pDb->cfg.minRows; alterReq.minRows = pDb->cfg.minRows;
alterReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
alterReq.walRetentionSize = pDb->cfg.walRetentionSize;
mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId); mInfo("vgId:%d, build alter vnode config req", pVgroup->vgId);
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
...@@ -2422,4 +2424,4 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, ...@@ -2422,4 +2424,4 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
STimeWindow tw) { STimeWindow tw) {
if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw) != 0) return -1; if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw) != 0) return -1;
return 0; return 0;
} }
\ No newline at end of file
...@@ -1477,10 +1477,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void ...@@ -1477,10 +1477,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
} }
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64 vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d", " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d walRetentionPeriod:%d "
"walRetentionSize:%d",
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024, TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.walFsyncPeriod, req.walLevel); req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize);
if (pVnode->config.cacheLastSize != req.cacheLastSize) { if (pVnode->config.cacheLastSize != req.cacheLastSize) {
pVnode->config.cacheLastSize = req.cacheLastSize; pVnode->config.cacheLastSize = req.cacheLastSize;
...@@ -1510,13 +1511,21 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void ...@@ -1510,13 +1511,21 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) { if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod; pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
walChanged = true; walChanged = true;
} }
if (pVnode->config.walCfg.level != req.walLevel) { if (pVnode->config.walCfg.level != req.walLevel) {
pVnode->config.walCfg.level = req.walLevel; pVnode->config.walCfg.level = req.walLevel;
walChanged = true;
}
if (pVnode->config.walCfg.retentionPeriod != req.walRetentionPeriod) {
pVnode->config.walCfg.retentionPeriod = req.walRetentionPeriod;
walChanged = true;
}
if (pVnode->config.walCfg.retentionSize != req.walRetentionSize) {
pVnode->config.walCfg.retentionSize = req.walRetentionSize;
walChanged = true; walChanged = true;
} }
......
...@@ -179,17 +179,23 @@ _err: ...@@ -179,17 +179,23 @@ _err:
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
if (pWal == NULL) return TSDB_CODE_APP_ERROR; if (pWal == NULL) return TSDB_CODE_APP_ERROR;
if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) { if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod &&
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.retentionPeriod == pCfg->retentionPeriod && pWal->cfg.retentionSize == pCfg->retentionSize) {
pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); wDebug("vgId:%d, walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64 " not change",
pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize);
return 0; return 0;
} }
wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level, wInfo("vgId:%d, change old walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64
pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); ", new walLevel:%d fsync:%d walRetentionPeriod:%d walRetentionSize:%" PRId64,
pWal->cfg.vgId, pWal->cfg.level, pWal->cfg.fsyncPeriod, pWal->cfg.retentionPeriod, pWal->cfg.retentionSize,
pCfg->level, pCfg->fsyncPeriod, pCfg->retentionPeriod, pCfg->retentionSize);
pWal->cfg.level = pCfg->level; pWal->cfg.level = pCfg->level;
pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod; pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
pWal->cfg.retentionPeriod = pCfg->retentionPeriod;
pWal->cfg.retentionSize = pCfg->retentionSize;
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
......
...@@ -327,7 +327,7 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -327,7 +327,7 @@ int32_t walEndSnapshot(SWal *pWal) {
wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
"), new tot size %" PRId64, "), new tot size %" PRId64,
pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize); pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize);
if (((pWal->cfg.retentionSize == 0) || (pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize)) || if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) ||
((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 && ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 &&
iter->closeTs + pWal->cfg.retentionPeriod < ts))) { iter->closeTs + pWal->cfg.retentionPeriod < ts))) {
// delete according to file size or close time // delete according to file size or close time
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册