提交 8e3084ec 编写于 作者: H Hongze Cheng

feat: auto retention

上级 77295df8
...@@ -15,6 +15,7 @@ target_sources( ...@@ -15,6 +15,7 @@ target_sources(
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c" "src/vnd/vnodeCompact.c"
"src/vnd/vnodeRetention.c"
# meta # meta
"src/meta/metaOpen.c" "src/meta/metaOpen.c"
......
...@@ -180,7 +180,6 @@ int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); ...@@ -180,7 +180,6 @@ int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
......
...@@ -595,6 +595,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { ...@@ -595,6 +595,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
return 0; return 0;
} }
#if 0
/** /**
* @brief retention of rsma1/rsma2 * @brief retention of rsma1/rsma2
* *
...@@ -618,6 +619,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { ...@@ -618,6 +619,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
_end: _end:
return code; return code;
} }
#endif
static void tdBlockDataDestroy(SArray *pBlockArr) { static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tsdb.h" #include "tsdb.h"
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
...@@ -38,19 +38,21 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { ...@@ -38,19 +38,21 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
return false; return false;
} }
bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
bool should;
taosThreadRwlockRdlock(&pTsdb->rwLock);
should = tsdbShouldDoRetentionImpl(pTsdb, now);
taosThreadRwlockUnlock(&pTsdb->rwLock);
return should;
}
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (!tsdbShouldDoRetention(pTsdb, now)) {
return code;
}
// do retention
STsdbFS fs = {0}; STsdbFS fs = {0};
code = tsdbFSCopy(pTsdb, &fs); code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
...@@ -60,8 +62,10 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { ...@@ -60,8 +62,10 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (expLevel < 0) { if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->aSttF[0]);
taosMemoryFree(pSet->pSmaF); taosMemoryFree(pSet->pSmaF);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
taosMemoryFree(pSet->aSttF[iStt]);
}
taosArrayRemove(fs.aDFileSet, iSet); taosArrayRemove(fs.aDFileSet, iSet);
iSet--; iSet--;
} else { } else {
...@@ -78,35 +82,33 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { ...@@ -78,35 +82,33 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
fSet.diskId = did; fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&fs, &fSet); code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
// do change fs // do change fs
code = tsdbFSPrepareCommit(pTsdb, &fs); code = tsdbFSPrepareCommit(pTsdb, &fs);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb); _exit:
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
goto _err; } else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
} }
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbFSDestroy(&fs); tsdbFSDestroy(&fs);
_exit:
return code; return code;
}
_err: static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); }
tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
ASSERT(0); int32_t tsdbCommitRetention(STsdb *pTsdb) {
// tsdbFSRollback(pTsdb->pFS); taosThreadRwlockWrlock(&pTsdb->rwLock);
return code; tsdbCommitRetentionImpl(pTsdb);
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
return 0;
} }
\ No newline at end of file
...@@ -35,9 +35,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 ...@@ -35,9 +35,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
return -1; return -1;
} }
if (pMsg) {
arrSize = taosArrayGetSize(pMsg->aSubmitTbData); arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
}
// scan and convert // scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
typedef struct {
SVnode *pVnode;
int64_t now;
int64_t commitID;
SVnodeInfo info;
} SRetentionInfo;
extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbCommitRetention(STsdb *pTsdb);
static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
tsem_wait(&pVnode->canCommit);
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
tsem_post(&pVnode->canCommit);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
static int32_t vnodeRetentionTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SRetentionInfo *pInfo = (SRetentionInfo *)param;
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
// save info
pInfo->info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// do job
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
vnodeCommitInfo(dir);
// commit sub-job
tsdbCommitRetention(pVnode->pTsdb);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
int32_t code = 0;
int32_t lino = 0;
if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code;
SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pInfo->pVnode = pVnode;
pInfo->now = now;
code = vnodePrepareRentention(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeRetentionTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
return 0;
}
\ No newline at end of file
...@@ -586,6 +586,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { ...@@ -586,6 +586,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision; pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
} }
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0; int32_t code = 0;
SVTrimDbReq trimReq = {0}; SVTrimDbReq trimReq = {0};
...@@ -598,12 +599,16 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -598,12 +599,16 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
// process // process
#if 0
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
if (code) goto _exit; if (code) goto _exit;
code = smaDoRetention(pVnode->pSma, trimReq.timestamp); code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
if (code) goto _exit; if (code) goto _exit;
#else
vnodeAsyncRentention(pVnode, trimReq.timestamp);
#endif
_exit: _exit:
return code; return code;
...@@ -635,6 +640,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p ...@@ -635,6 +640,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp); ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp);
if (ret) goto end; if (ret) goto end;
#else
vnodeAsyncRentention(pVnode, ttlReq.timestamp);
#endif #endif
end: end:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册