提交 b44eb59f 编写于 作者: H Hongze Cheng

tsdb retention fix done

上级 3652bf66
......@@ -89,7 +89,18 @@ typedef struct SLDataIter SLDataIter;
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE))
#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1)
#define LOGIC_TO_FILE_SIZE(LSIZE, PAGE) OFFSET_PGNO(LOGIC_TO_FILE_OFFSET(LSIZE, PAGE), PAGE) * (PAGE)
static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
int64_t fOffSet = LOGIC_TO_FILE_OFFSET(lSize, szPage);
int64_t pgno = OFFSET_PGNO(fOffSet, szPage);
int32_t szPageCont = PAGE_CONTENT_SIZE(szPage);
if (fOffSet % szPageCont == 0) {
pgno--;
}
return pgno * szPage;
}
// tsdbUtil.c ==============================================================================================
// TSDBROW
......
......@@ -295,7 +295,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != LOGIC_TO_FILE_SIZE(pSet->pHeadF->size, TSDB_DEFAULT_PAGE_SIZE)) {
if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......@@ -306,10 +306,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pDataF->size, TSDB_DEFAULT_PAGE_SIZE)) {
} else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
if (code) goto _err;
}
......@@ -320,10 +320,10 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > LOGIC_TO_FILE_SIZE(pSet->pSmaF->size, TSDB_DEFAULT_PAGE_SIZE)) {
} else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
if (code) goto _err;
}
......@@ -335,7 +335,7 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != LOGIC_TO_FILE_SIZE(pSet->aSttF[iStt]->size, TSDB_DEFAULT_PAGE_SIZE)) {
if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......
......@@ -148,7 +148,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
}
// ftruncate
if (taosFtruncateFile(pFD, LOGIC_TO_FILE_SIZE(size, TSDB_DEFAULT_PAGE_SIZE)) < 0) {
if (taosFtruncateFile(pFD, tsdbLogicToFileSize(size, pTsdb->pVnode->config.tsdbPageSize)) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......
......@@ -611,28 +611,26 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t code = 0;
int64_t n;
int64_t size;
TdFilePtr pOutFD = NULL; // TODO
TdFilePtr PInFD = NULL; // TODO
TdFilePtr pOutFD = NULL;
TdFilePtr PInFD = NULL;
int32_t szPage = pTsdb->pVnode->config.szPage;
char fNameFrom[TSDB_FILENAME_LEN];
char fNameTo[TSDB_FILENAME_LEN];
// head
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size);
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -643,44 +641,17 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
// data
tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
// stt
tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[0], fNameFrom);
tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[0], fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSttF[0]->size);
n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -691,20 +662,17 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
// sma
tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size);
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -712,6 +680,29 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
// stt
for (int8_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) {
tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[iStt], fNameFrom);
tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[iStt], fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
if (PInFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pOutFD);
taosCloseFile(&PInFD);
}
return code;
_err:
......
......@@ -82,8 +82,6 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _err;
}
/* code */
}
// do change fs
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册