提交 f79c72d5 编写于 作者: C Cary Xu

truncate

上级 d25a2997
......@@ -836,9 +836,9 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
*(int8_t *)pWrite = pVgroup->compact;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->truncate;
cols++;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int8_t *)pWrite = pVgroup->truncate;
// cols++;
mnodeDecVgroupRef(pVgroup);
numOfRows++;
......
......@@ -17,14 +17,13 @@
extern int32_t tsTsdbMetaCompactRatio;
#define TSDB_MAX_SUBBLOCKS 8
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
typedef struct {
SRtn rtn; // retention snapshot
SFSIter fsIter; // tsdb file iterator
......
......@@ -49,7 +49,7 @@ static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static bool tsdbShouldTruncate(STruncateH *pTruncateH);
static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitCompTbArray(STruncateH *pTruncateH);
......@@ -133,12 +133,14 @@ static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) {
// avoid repeated input of commands by end users in a short period of time
if (pRepo->truncateState != TSDB_NO_TRUNCATE) {
tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo));
return 0;
return -1;
}
pRepo->truncateState = TSDB_WAITING_TRUNCATE;
// flush the mem data to disk synchronously(have impact on the compression rate)
tsdbSyncCommit(pRepo);
if (tsdbSyncCommit(pRepo) < 0) {
return -1;
}
// truncate
tsem_wait(&(pRepo->readyToCommit));
......@@ -175,6 +177,7 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) {
}
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
STruncateH compactH;
SDFileSet *pSet = NULL;
STruncateTblMsg *pMsg = (STruncateTblMsg *)param;
......@@ -186,6 +189,9 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
return -1;
}
int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision);
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
// remove expired files
if (pSet->fid < compactH.rtn.minFid) {
......@@ -194,6 +200,11 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
continue;
}
if ((pSet->fid != sFid) && (pSet->fid != eFid)) {
continue;
}
#if 0
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not truncate", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
......@@ -225,7 +236,7 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
return -1;
}
if (!tsdbShouldTruncate(pTruncateH)) {
if (!tsdbShouldTruncate(pRepo, pTruncateH)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_TRUNCATE_REPO(pTruncateH), pSet, &(pTruncateH->rtn)) < 0) {
tsdbTruncateFSetEnd(pTruncateH);
......@@ -234,12 +245,14 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
#if 1 // how to make the decision?
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
#endif
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
......@@ -265,61 +278,27 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
return 0;
}
static bool tsdbShouldTruncate(STruncateH *pTruncateH) {
if (tsdbForceTruncateFile) {
return true;
}
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
STableTruncateH *pTh;
SBlock * pBlock;
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh);
SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh);
int tblocks = 0; // total blocks
int nSubBlocks = 0; // # of blocks with sub-blocks
int nSmallBlocks = 0; // # of blocks with rows < defaultRows
int64_t tsize = 0;
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) {
pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i);
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
tblocks++;
pBlock = pTh->pInfo->blocks + bidx;
if (pBlock->numOfRows < defaultRows) {
nSmallBlocks++;
}
static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
if (pBlock->numOfSubBlocks > 1) {
nSubBlocks++;
for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
tsize = tsize + iBlock->len;
}
} else if (pBlock->numOfSubBlocks == 1) {
tsize += pBlock->len;
} else {
ASSERT(0);
}
}
}
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
}
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo, ) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pTruncateH, 0, sizeof(*pTruncateH));
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH));
pTruncateH->
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn));
tsdbFSIterInit(&(pTruncateH->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
......
......@@ -117,7 +117,7 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t vnodeCompact(int32_t vgId) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, compact vnode msg is received", vgId);
printf("vgId:%d, compact vnode msg is received\n", vgId);
//not care success or not
tsdbCompact(((SVnodeObj*)pVnode)->tsdb);
vnodeRelease(pVnode);
......@@ -125,17 +125,31 @@ int32_t vnodeCompact(int32_t vgId) {
vInfo("vgId:%d, vnode not exist, can't compact it", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
printf("vgId:%d, compact vnode msg is finished\n", vgId);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeTruncate(STruncateTblMsg *pMsg) {
int32_t vgId = pMsg->vgId;
// build test data
// pMsg->vgId = 2;
// pMsg->uid = 562949986978701;
// pMsg->nSpan = 1;
// pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow));
int32_t vgId = 2;
void * pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname);
// not care success or not
void *param = NULL;
tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param);
STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
param->vgId = 2;
param->uid = 562949986978794;
param->nSpan = 1;
param->span[0].skey = 0;
param->span[0].ekey = 1;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
vnodeRelease(pVnode);
} else {
vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册