未验证 提交 0e6481e8 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20190 from taosdata/feat/auto_retention

feat: auto retentiion
......@@ -15,6 +15,7 @@ target_sources(
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c"
"src/vnd/vnodeRetention.c"
# meta
"src/meta/metaOpen.c"
......
......@@ -180,7 +180,6 @@ int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
......
......@@ -595,6 +595,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
return 0;
}
#if 0
/**
* @brief retention of rsma1/rsma2
*
......@@ -618,6 +619,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
_end:
return code;
}
#endif
static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
......
......@@ -57,32 +57,6 @@ typedef struct {
SBlockData sData;
} STsdbCompactor;
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCompactor->pTsdb;
code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
......@@ -660,8 +634,31 @@ _exit:
if (code) {
tsdbAbortCompact(pCompactor);
} else {
tsdbCommitCompact(pCompactor);
tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
}
tsdbEndCompact(pCompactor);
return code;
}
int32_t tsdbCommitCompact(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
......@@ -15,7 +15,7 @@
#include "tsdb.h"
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
......@@ -38,19 +38,21 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
return false;
}
bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
bool should;
taosThreadRwlockRdlock(&pTsdb->rwLock);
should = tsdbShouldDoRetentionImpl(pTsdb, now);
taosThreadRwlockUnlock(&pTsdb->rwLock);
return should;
}
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0;
if (!tsdbShouldDoRetention(pTsdb, now)) {
return code;
}
// do retention
int32_t lino = 0;
STsdbFS fs = {0};
code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
......@@ -60,8 +62,10 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->aSttF[0]);
taosMemoryFree(pSet->pSmaF);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
taosMemoryFree(pSet->aSttF[iStt]);
}
taosArrayRemove(fs.aDFileSet, iSet);
iSet--;
} else {
......@@ -78,35 +82,33 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// do change fs
code = tsdbFSPrepareCommit(pTsdb, &fs);
if (code) goto _err;
taosThreadRwlockWrlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSCommit(pTsdb);
_exit:
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbFSDestroy(&fs);
_exit:
return code;
}
_err:
tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
ASSERT(0);
// tsdbFSRollback(pTsdb->pFS);
return code;
static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); }
int32_t tsdbCommitRetention(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
tsdbCommitRetentionImpl(pTsdb);
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
return 0;
}
\ No newline at end of file
......@@ -35,9 +35,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
return -1;
}
if (pMsg) {
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
}
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
// scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
......
......@@ -15,6 +15,8 @@
#include "vnd.h"
extern int32_t tsdbCommitCompact(STsdb *pTsdb);
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
......@@ -33,8 +35,11 @@ static int32_t vnodeCompactTask(void *param) {
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeCommitInfo(dir);
tsdbCommitCompact(pVnode->pTsdb);
_exit:
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
typedef struct {
SVnode *pVnode;
int64_t now;
int64_t commitID;
SVnodeInfo info;
} SRetentionInfo;
extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbCommitRetention(STsdb *pTsdb);
static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
tsem_wait(&pVnode->canCommit);
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
tsem_post(&pVnode->canCommit);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
static int32_t vnodeRetentionTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SRetentionInfo *pInfo = (SRetentionInfo *)param;
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
// save info
pInfo->info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// do job
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
vnodeCommitInfo(dir);
// commit sub-job
tsdbCommitRetention(pVnode->pTsdb);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
int32_t code = 0;
int32_t lino = 0;
if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code;
SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pInfo->pVnode = pVnode;
pInfo->now = now;
code = vnodePrepareRentention(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeRetentionTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
return 0;
}
\ No newline at end of file
......@@ -586,6 +586,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SVTrimDbReq trimReq = {0};
......@@ -598,12 +599,16 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
// process
// process
#if 0
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
if (code) goto _exit;
code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
if (code) goto _exit;
#else
vnodeAsyncRentention(pVnode, trimReq.timestamp);
#endif
_exit:
return code;
......@@ -635,6 +640,10 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp);
if (ret) goto end;
#else
vnodeAsyncRentention(pVnode, ttlReq.timestamp);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
#endif
end:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册