提交 b1e42fd5 编写于 作者: C Cary Xu

update

上级 f79c72d5
......@@ -54,4 +54,12 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
}
}
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
......@@ -17,13 +17,7 @@
extern int32_t tsTsdbMetaCompactRatio;
#define TSDB_MAX_SUBBLOCKS 8
FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
typedef struct {
SRtn rtn; // retention snapshot
SFSIter fsIter; // tsdb file iterator
......
......@@ -49,7 +49,6 @@ static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitCompTbArray(STruncateH *pTruncateH);
......@@ -178,34 +177,38 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) {
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
STruncateH compactH;
SDFileSet *pSet = NULL;
STruncateH truncateH;
SDFileSet * pSet = NULL;
STruncateTblMsg *pMsg = (STruncateTblMsg *)param;
ASSERT(pMsg != NULL);
tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pMsg->uid);
if (tsdbInitTruncateH(&compactH, pRepo) < 0) {
if (tsdbInitTruncateH(&truncateH, pRepo) < 0) {
return -1;
}
int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision);
ASSERT(sFid <= eFid);
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
while ((pSet = tsdbFSIterNext(&(truncateH.fsIter)))) {
// remove expired files
if (pSet->fid < compactH.rtn.minFid) {
if (pSet->fid < truncateH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
}
if ((pSet->fid != sFid) && (pSet->fid != eFid)) {
if ((pSet->fid < sFid) || (pSet->fid > eFid)) {
tsdbDebug("vgId:%d no need to truncate FSET %d, sFid %d, eFid %d", REPO_ID(pRepo), pSet->fid, sFid, eFid);
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(truncateH.rtn)) < 0) {
return -1;
}
continue;
}
#if 0
#if 0 // TODO: How to make the decision? The test case should cover this scenario.
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not truncate", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
......@@ -213,14 +216,14 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
}
#endif
if (tsdbTruncateFSet(&compactH, pSet) < 0) {
tsdbDestroyTruncateH(&compactH);
if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
}
tsdbDestroyTruncateH(&compactH);
tsdbDestroyTruncateH(&truncateH);
tsdbDebug("vgId:%d truncate TS data over", REPO_ID(pRepo));
return 0;
}
......@@ -229,35 +232,26 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
SDiskID did;
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
tsdbDebug("vgId:%d start to truncate FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
return -1;
}
if (!tsdbShouldTruncate(pRepo, pTruncateH)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_TRUNCATE_REPO(pTruncateH), pSet, &(pTruncateH->rtn)) < 0) {
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
} else {
// Create new fset as compacted fset
// Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
#if 1 // how to make the decision?
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
#endif
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
......@@ -271,33 +265,17 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
}
tsdbDebug("vgId:%d FSET %d truncate over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(pTruncateH);
return 0;
}
static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
}
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo, ) {
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pTruncateH, 0, sizeof(*pTruncateH));
pTruncateH->
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn));
......@@ -470,7 +448,7 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
}
tdFreeSchema(pSchema);
// Loop to compact each block data
// Loop to truncate each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
......
......@@ -145,8 +145,8 @@ int32_t vnodeTruncate(STruncateTblMsg *pMsg) {
param->vgId = 2;
param->uid = 562949986978794;
param->nSpan = 1;
param->span[0].skey = 0;
param->span[0].ekey = 1;
param->span[0].skey = 1637417678000;
param->span[0].ekey = 1637417679000;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册