提交 dc2d5541 编写于 作者: H Hongze Cheng

refact tsdb commit

上级 e843619b
......@@ -55,6 +55,7 @@ typedef struct {
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
typedef struct {
int direction;
......@@ -72,7 +73,7 @@ STsdbFS *tsdbNewFS(int keep, int days);
void * tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdbFS *pFs, int keep, int days);
void tsdbCloseFS(STsdbFS *pFs);
uint32_t tsdbStartFSTxn(STsdbFS *pfs);
void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdbFS *pfs);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
......
......@@ -286,6 +286,12 @@ typedef struct {
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
#define TSDB_FSET_FSYNC(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
......
......@@ -54,6 +54,8 @@ typedef struct {
SList* actList;
SList* extraBuffList;
SList* bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
enum { TSDB_UPDATE_META, TSDB_DROP_META };
......
......@@ -89,7 +89,6 @@ struct STsdbRepo {
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define REPO_FS(r) ((r)->fs)
#define REPO_FS_VERSION(r) // TODO
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
......
......@@ -25,7 +25,6 @@ typedef struct {
} SRtn;
typedef struct {
uint32_t version;
SRtn rtn; // retention snapshot
SFSIter fsIter; // tsdb file iterator
int niters; // memory iterators
......@@ -53,6 +52,7 @@ typedef struct {
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static int tsdbCommitMeta(STsdbRepo *pRepo);
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen);
......@@ -87,6 +87,8 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
static int tsdbApplyRtn(STsdbRepo *pRepo);
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
......@@ -131,7 +133,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
if (listNEles(pMem->actList) <= 0) {
// no
// no meta data to commit, just keep the old meta file
tsdbUpdateMFile(pfs, pOMFile);
return 0;
} else {
......@@ -140,7 +142,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
// Create a new meta file
did.level = TFS_PRIMARY_LEVEL;
did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), pfs->nstatus->meta.version);
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf) < 0) {
return -1;
......@@ -176,6 +178,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
return -1;
}
TSDB_FILE_FSYNC(&mf);
tsdbCloseMFile(&mf);
tsdbUpdateMFile(pfs, &mf);
......@@ -266,17 +269,20 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
return 0;
}
// =================== Commit Time-Series Data
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
SCommitH commith = {0};
STsdbFS * pfs = REPO_FS(pRepo);
SDFileSet *pSet = NULL;
SDFileSet nSet;
int fid;
if (pMem->numOfRows <= 0) return 0;
if (pMem->numOfRows <= 0) {
// No memory data, just apply retention on each file on disk
if (tsdbApplyRtn(pRepo) < 0) {
return -1;
}
return 0;
}
// Resource initialization
if (tsdbInitCommitH(&commith, pRepo) < 0) {
......@@ -285,9 +291,8 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
while (true) {
pSet = tsdbFSIterNext(&(commith.fsIter));
if (pSet == NULL || pSet->fid >= commith.rtn.minFid) break;
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
if (pSet->fid >= commith.rtn.minFid) break;
}
// Loop to commit to each file
......@@ -299,33 +304,11 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
int level, id;
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(commith.rtn)), &level, &id);
if (level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
if (level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
} else {
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
}
pSet = tsdbFSIterNext(&(commith.fsIter));
} else {
// Has memory data to commit
......@@ -358,10 +341,12 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
static int tsdbStartCommit(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
if (tsdbStartFSTxn(REPO_FS(pRepo)) < 0) return -1;
tsdbStartFSTxn(REPO_FS(pRepo), pMem->pointsAdd, pMem->storageAdd);
pRepo->code = TSDB_CODE_SUCCESS;
return 0;
......@@ -429,10 +414,10 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return -1;
}
// Close commit file
tsdbCloseCommitFile(pCommith, false);
if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
// TODO
return -1;
}
......@@ -507,18 +492,9 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pCommith, 0, sizeof(*pCommith));
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype));
}
pCommith->version = REPO_FS_VERSION(pRepo) + 1;
tsdbGetRtnSnap(pRepo, &(pCommith->rtn));
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype);
TSDB_FILE_SET_CLOSED(pDFile);
}
TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith));
// Init read handle
if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
......@@ -1365,12 +1341,12 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
}
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
int level, id;
SDiskID did;
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id);
if (level == TFS_UNDECIDED_LEVEL) {
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
......@@ -1385,54 +1361,33 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
return -1;
} else {
pCommith->isRFileSet = false;
}
// Set and open commit FSET
if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) {
// Create new FSET
did.level = level;
did.id = id;
tsdbInitDFileSet(pWSet, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version);
if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype)));
}
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
return -1;
}
if (tsdbUpdateDFileSetHeader(pWSet) < 0) {
tsdbCloseDFileSet(pWSet);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype)));
}
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pWSet) < 0) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
return -1;
}
// TODO: update file info;
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
} else {
level = TSDB_FSET_LEVEL(pSet);
id = TSDB_FSET_ID(pSet);
did.level = TSDB_FSET_LEVEL(pSet);
did.id = TSDB_FSET_ID(pSet);
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
did.level = level;
did.id = id;
tsdbInitDFile(pWHeadf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_HEAD);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf) < 0) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
......@@ -1446,7 +1401,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFileEx(pWHeadf, pRDataf);
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
tsdbCloseDFile(pWHeadf);
remove(TSDB_FILE_FULL_NAME(pWHeadf));
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
......@@ -1460,17 +1415,26 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFile(pWLastf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_LAST);
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
}
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
tsdbCloseDFile(pWDataf);
tsdbCloseDFile(pWHeadf);
remove(TSDB_FILE_FULL_NAME(pWHeadf));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
if (tsdbCreateDFile(pWLastf) < 0) {
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
......@@ -1484,10 +1448,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
}
if (!hasError) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype);
fsync(TSDB_FILE_FD(pDFile));
}
TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith));
}
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
......@@ -1508,4 +1469,59 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
}
return false;
}
static int tsdbApplyRtn(STsdbRepo *pRepo) {
SRtn rtn;
SFSIter fsiter;
STsdbFS * pfs = REPO_FS(pRepo);
SDFileSet *pSet;
// Get retentioni snapshot
tsdbGetRtnSnap(pRepo, &rtn);
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
while ((pSet = tsdbFSIterNext(&fsiter))) {
if (pSet->fid < rtn.minFid) continue;
if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) {
return -1;
}
}
return 0;
}
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet;
STsdbFS * pfs = REPO_FS(pRepo);
ASSERT(pSet->fid >= pRtn->minFid);
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, pRtn), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
\ No newline at end of file
......@@ -232,13 +232,15 @@ void tsdbCloseFS(STsdbFS *pFs) {
}
// Start a new transaction to modify the file system
uint32_t tsdbStartFSTxn(STsdbFS *pfs) {
void tsdbStartFSTxn(STsdbFS *pfs, int64_t pointsAdd, int64_t storageAdd) {
ASSERT(pfs->intxn == false);
pfs->intxn = true;
tsdbResetFSStatus(pfs->nstatus);
return pfs->cstatus->meta.version + 1;
pfs->nstatus->meta = pfs->cstatus->meta;
pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1;
pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd;
pfs->nstatus->meta.version = pfs->cstatus->meta.totalStorage += storageAdd;
}
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
......@@ -268,7 +270,8 @@ int tsdbEndFSTxn(STsdbFS *pfs) {
}
int tsdbEndFSTxnWithError(STsdbFS *pfs) {
// TODO
tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
// TODO: if mf change, reload pfs->metaCache
pfs->intxn = false;
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册