diff --git a/src/inc/tfs.h b/src/inc/tfs.h index 1f47006a5b9a29836fcc76320cc9c864702f8273..8b58374b947e6f3bd3bb33836c2a70d6ac9d3a74 100644 --- a/src/inc/tfs.h +++ b/src/inc/tfs.h @@ -40,6 +40,7 @@ int64_t tfsTotalSize(); int64_t tfsAvailSize(); void tfsIncDiskFile(int level, int id, int num); void tfsDecDiskFile(int level, int id, int num); +void tfsAllocDisk(int expLevel, int *level, int *id); const char *TFS_PRIMARY_PATH(); const char *TFS_DISK_PATH(int level, int id); diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index fc47b07973124fe05b5a93579876cf9bcc2ea75f..563adc88db49f146437b4d320aa07df018f9caa7 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -156,6 +156,30 @@ void tfsDecDiskFile(int level, int id, int num) { tfsUnLock(); } +/* Allocate an existing available tier level + */ +void tfsAllocDisk(int expLevel, int *level, int *id) { + *level = expLevel; + *id = TFS_UNDECIDED_ID; + + if (*level > TFS_NLEVEL()) { + *level = TFS_NLEVEL(); + } + + while (*level >= 0) { + *id = tfsAssignDisk(*level); + if (*id < 0) { + *level--; + continue; + } + + return; + } + + *level = TFS_UNDECIDED_LEVEL; + *id = TFS_UNDECIDED_ID; +} + const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); } const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 669c736b7415330bf7dee3011f00e22c535e4404..397f5707ad76568e2433387cc83c445cc3b75169 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -24,9 +24,14 @@ typedef struct { } SRtn; typedef struct { - SRtn rtn; - SCommitIter *iters; - SRWHelper whelper; + SRtn rtn; // retention snapshot + int niters; + SCommitIter *iters; // memory iterators + SReadH readh; + SDFileSet * pWSet; + SArray * aBlkIdx; + SArray * aSupBlk; + SArray * aSubBlk; SDataCols * pDataCols; } SCommitH; @@ -210,59 +215,44 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) { SDFileSet rSet; SDFileSet wSet; - int level; + int level, id; - if (pOldSet && pOldSet->fid < pch->rtn.minFid) { // file is deleted + // ASSERT(pOldSet != NULL || fid != TSDB_IVLD_FID); + + // file should be deleted, do nothing and return + if (pOldSet && pOldSet->fid < pch->rtn.minFid) { ASSERT(fid == TSDB_IVLD_FID); return 0; } - // if (pOldSet) { - // ASSERT(fid == TSDB_IVLD_FID || pOldSet->fid == fid); - // if (true /* TODO: pOldSet not in correct level*/) { - // // TODO: Check if pOldSet is on correct level, if not, move it to correct level - // } else { - // tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), REPO_ID(pRepo), fid, 0 /*TODO*/, 0 /*TODO*/, 0 - // /*TODO*/, - // NULL, TSDB_FILE_HEAD); - // // TODO: init data - // tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA)); - - // // TODO: init last file - // SDFile *pDFile = TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST); - // if (pDFile->info->size < 32K) { - - // } else { - - // } - - // tsdbInitDFileWithOld(&oSet, pOldSet); - // pReadSet = &oSet; - // } - // } else { - // ASSERT(fid != TSDB_IVLD_FID); - - // // Create a new file group - // tsdbInitDFileSet(&nSet, REPO_ID(pRepo), fid, 0 /*TODO*/, tsdbGetFidLevel(fid, &(pch->rtn)), TFS_UNDECIDED_ID); - // tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT); - // tsdbUpdateDFileSetHeader(&nSet); - // } - - { - // TODO: set rSet and wSet, the read file set and write file set - } + if (pOldSet == NULL) { + ASSERT(fid != TSDB_IVLD_FID); - if (fid == TSDB_IVLD_FID) { - // TODO: copy rSet as wSet - } else { - tsdbSetAndOpenCommitFSet(pch, &rSet, &wSet); + tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id); + if (level == TFS_UNDECIDED_LEVEL) { + // terrno = TSDB_CODE_TDB_NO_INVALID_DISK; + return -1; + } - for (int i = 0; i < pMem->maxTable; i++) { - tsdbCommitTableData; - /* code */ + // wSet here is the file to write, no read set + tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); + } else { + tfsAllocDisk(tsdbGetFidLevel(pOldSet->fid, &(pch->rtn)), &level, &fid); + if (level == TFS_UNDECIDED_LEVEL) { + // terrno = TSDB_CODE_TDB_NO_INVALID_DISK; + return -1; } - tsdbCloseAndUnSetCommitFSet(pch); + if (level > TSDB_FSET_LEVEL(pOldSet)) { + // wSet here is the file to write, pOldSet here is the read set + tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0 /*TODO*/, level, id); + } else { + // get wSet with pOldSet + } + // if (level == TSDB_FSET_LEVEL(pOldSet)) { + // } else { + // // TODO + // } } tsdbUpdateDFileSet(pRepo, &wSet);