diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 281ee7dc682858a1f84e3affc3fb6457b090dc78..9b7ad04b7cc06fb6e8b2a6b8ba866df6103ce44f 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -25,7 +25,7 @@ extern "C" { // ================== CURRENT file header info typedef struct { uint32_t version; // Current file version - uint32_t len; + uint32_t len; // Encode content length (including checksum) } SFSHeader; // ================== TSDB File System Meta @@ -38,6 +38,7 @@ typedef struct { // ================== typedef struct { STsdbFSMeta meta; // FS meta + SMFile* pmf; // meta file pointer SMFile mf; // meta file SArray* df; // data file array } SFSStatus; @@ -45,12 +46,10 @@ typedef struct { typedef struct { pthread_rwlock_t lock; - SFSStatus* cstatus; // current stage - SHashObj* metaCache; // meta - + SFSStatus* cstatus; // current status + SHashObj* metaCache; // meta cache bool intxn; - SFSStatus* nstatus; - SList* metaDelta; + SFSStatus* nstatus; // new status } STsdbFS; #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) @@ -58,12 +57,17 @@ typedef struct { #define FS_IN_TXN(pfs) (pfs)->intxn typedef struct { + int direction; uint64_t version; // current FS version - int index; - int fid; + STsdbFS* pfs; + int index; // used to position next fset when version the same + int fid; // used to seek when version is changed SDFileSet* pSet; } SFSIter; +#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC +#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC + #if 0 int tsdbOpenFS(STsdbRepo* pRepo); void tsdbCloseFS(STsdbRepo* pRepo); diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 7e62f6acef7afc1cfc3408609c1a118c4c4f7803..182fc9d443da8dacfe30d1094ed2ab867b073713 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -23,12 +23,14 @@ extern "C" { #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF +#define TSDB_IVLD_FID INT_MIN #define TSDB_FILE_INFO(tf) (&((tf)->info)) #define TSDB_FILE_F(tf) (&((tf)->f)) #define TSDB_FILE_FD(tf) ((tf)->fd) #define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf)) #define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0) +#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1) #define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) #define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) @@ -61,6 +63,7 @@ void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, int ver); void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); +int tsdbApplyMFileChange(const SMFile* from, const SMFile* to); static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pMFile)); @@ -288,6 +291,7 @@ void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, int ver); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); +int tsdbApplyDFileSetChange(const SDFileSet* from, const SDFileSet* to); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 5dd3dc70bcc567e420568b3b21f3e3fdb48ae185..7901c4fb8e2067fe16b03ef77f59f38397326107 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -14,7 +14,6 @@ */ #include "tsdbint.h" -#define TSDB_IVLD_FID INT_MIN #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days)) diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 7bdbe35c38517075eba2009f9837002e94cdd7b9..1e748c0286b2093a61cc2fdc504c3027a491e25b 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -15,6 +15,8 @@ #include "tsdbint.h" +#define TSDB_FS_CURRENT_FNAME "current" +#define TSDB_FS_TEMP_FNAME "current.t" #define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3) // ================== CURRENT file header info @@ -27,9 +29,9 @@ static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { return tlen; } -static void *tsdbEncodeFSHeader(void *buf, SFSHeader *pHeader) { - buf = taosEncodeFixedU32(buf, &(pHeader->version)); - buf = taosEncodeFixedU32(buf, &(pHeader->len)); +static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) { + buf = taosDecodeFixedU32(buf, &(pHeader->version)); + buf = taosDecodeFixedU32(buf, &(pHeader->len)); return buf; } @@ -76,8 +78,6 @@ static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { buf = taosDecodeFixedU64(buf, &nset); for (size_t i = 0; i < nset; i++) { - SDFileSet *pSet = taosArrayGet(pArray, i); - buf = tsdbDecodeDFileSet(buf, &dset); taosArrayPush(pArray, (void *)(&dset)); } @@ -85,19 +85,22 @@ static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { } static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { + ASSERT(pStatus->pmf); + int tlen = 0; - tlen += tsdbEncodeFSMeta(buf, &(pStatus->meta)); - tlen += tsdbEncodeSMFile(buf, &(pStatus->mf)); + tlen += tsdbEncodeSMFile(buf, &(pStatus->pmf)); tlen += tsdbEncodeDFileSetArray(buf, pStatus->df); return tlen; } static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { - buf = taosDecodeFixedU32(buf, pStatus->fsVer); - buf = tsdbDecodeFSMeta(buf, &(pStatus->meta)); - buf = tsdbDecodeSMFile(buf, &(pStatus->mf)); + tsdbResetFSStatus(pStatus); + + pStatus->pmf = &(pStatus->mf); + + buf = tsdbDecodeSMFile(buf, pStatus->pmf); buf = tsdbDecodeDFileSetArray(buf, pStatus->df); return buf; @@ -111,7 +114,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) { } pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); - if (pStatus->df) { + if (pStatus->df == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; free(pStatus); return NULL; @@ -134,62 +137,79 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) { return; } + pStatus->pmf = NULL; taosArrayClear(pStatus->df); } +static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) { + ASSERT(pStatus->pmf == NULL && TSDB_FILE_CLOSED(pMFile)); + + pStatus->pmf = &(pStatus->mf); + *(pStatus->pmf) = *pMFile; +} + +static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { + ASSERT(TSDB_FILE_CLOSED(&(pSet->files[0]))); + ASSERT(TSDB_FILE_CLOSED(&(pSet->files[1]))); + ASSERT(TSDB_FILE_CLOSED(&(pSet->files[2]))); + + if (taosArrayPush(pStatus->df, (void *)pStatus) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + // ================== STsdbFS -STsdbFS *tsdbNewFS(int maxFSet) { - STsdbFS *pFs = (STsdbFS *)calloc(1, sizeof(*pFs)); - if (pFs == NULL) { +STsdbFS *tsdbNewFS(int keep, int days) { + int maxFSet = TSDB_MAX_FSETS(keep, days); + STsdbFS *pfs; + + pfs = (STsdbFS *)calloc(1, sizeof(*pfs)); + if (pfs == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } - int code = pthread_rwlock_init(&(pFs->lock), NULL); + int code = pthread_rwlock_init(&(pfs->lock), NULL); if (code) { terrno = TAOS_SYSTEM_ERROR(code); - free(pFs); + free(pfs); return NULL; } - pFs->cstatus = tsdbNewFSStatus(maxFSet); - if (pFs->cstatus == NULL) { - tsdbFreeFS(pFs); + pfs->cstatus = tsdbNewFSStatus(maxFSet); + if (pfs->cstatus == NULL) { + tsdbFreeFS(pfs); return NULL; } - pFs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (pFs->metaCache == NULL) { + pfs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pfs->metaCache == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbFreeFS(pFs); + tsdbFreeFS(pfs); return NULL; } - pFs->nstatus = tsdbNewFSStatus(maxFSet); - if (pFs->nstatus == NULL) { - tsdbFreeFS(pFs); + pfs->nstatus = tsdbNewFSStatus(maxFSet); + if (pfs->nstatus == NULL) { + tsdbFreeFS(pfs); return NULL; } - pFs->metaDelta = tdListNew(sizeof(SKVRecord)); - if (pFs->metaDelta == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbFreeFS(pFs); - return NULL; - } - - return NULL; + return pfs; } -void *tsdbFreeFS(STsdbFS *pFs) { - if (pFs) { - pFs->metaDelta = tdListFree(pFs->metaDelta); - pFs->nstatus = tsdbFreeFSStatus(pFs->nstatus); - taosHashCleanup(pFs->metaCache); - pFs->metaCache = NULL; - pFs->cstatus = tsdbFreeFSStatus(pFs->cstatus); - pthread_rwlock_destroy(&(pFs->lock)); +void *tsdbFreeFS(STsdbFS *pfs) { + if (pfs) { + pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus); + taosHashCleanup(pfs->metaCache); + pfs->metaCache = NULL; + pfs->cstatus = tsdbFreeFSStatus(pfs->cstatus); + pthread_rwlock_destroy(&(pfs->lock)); } + return NULL; } @@ -203,428 +223,271 @@ void tsdbCloseFS(STsdbFS *pFs) { // TODO } -int tsdbStartTxn(STsdbFS *pFs) { - tsdbResetFSStatus(pFs->nstatus); - tdListEmpty(pFs->metaDelta); - return 0; -} +// Start a new transaction to modify the file system +int tsdbStartTxn(STsdbFS *pfs) { + ASSERT(pfs->intxn == false); -int tsdbEndTxn(STsdbFS *pFs, bool hasError) { - SFSStatus *pTStatus; + pfs->intxn = true; + tsdbResetFSStatus(pfs->nstatus); - if (hasError) { - // TODO - } else { - // TODO 1. Create and open a new file current.t + return 0; +} - // TODO 2. write new status to new file and fysnc and close +int tsdbEndTxn(STsdbFS *pfs) { + ASSERT(FS_IN_TXN(pfs)); + SFSStatus *pStatus; - // TODO 3. rename current.t to current + // Write current file system snapshot + if (tsdbUpdateFS(pfs) < 0) { + tsdbEndTxnWithError(pfs); + return -1; + } - // TODO 4. apply change to file - tsdbWLockFS(pFs); - pTStatus = pFs->cstatus; - pFs->cstatus = pFs->nstatus; - pFs->nstatus = pTStatus; - tsdbUnLockFS(pFs); + // Make new + tsdbWLockFS(pfs); + pStatus = pfs->cstatus; + pfs->cstatus = pfs->nstatus; + pfs->nstatus = pStatus; + tsdbUnLockFS(pfs); - // TODO 5: apply meta change to cache - } + // Apply actual change to each file and SDFileSet + tsdbApplyFSChangeOnDisk(pfs); + pfs->intxn = false; return 0; } -// ================== SFSIter -void tsdbFSIterInit(STsdbFS *pFs, SFSIter *pIter) { +int tsdbEndTxnWithError(STsdbFS *pfs) { // TODO + pfs->intxn = false; + return 0; } -SDFileSet *tsdbFSIterNext(STsdbFS *pFs) { - // TODO - return NULL; -} +void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); } -#if 0 -int tsdbOpenFS(STsdbRepo *pRepo) { - ASSERT(REPO_FS == NULL); +int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } - STsdbCfg *pCfg = TSDB_CFG(pRepo); +static int tsdbUpdateFS(STsdbFS *pfs) { + ASSERT(FS_IN_TXN(pfs)); + SFSHeader fsheader; + void * pBuf = NULL; + void * ptr; + char hbuf[TSDB_FILE_HEAD_SIZE] = "\0"; - // Create fs object - REPO_FS(pRepo) = tsdbNewFS(pCfg->keep, pCfg->daysPerFile); - if (REPO_FS(pRepo) == NULL) { - tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno)); + int fd = open(TSDB_FS_TEMP_FNAME, O_WRONLY | O_CREAT | O_TRUNC, 0755); + if (fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - // Load TSDB file system from disk - if (tsdbOpenFSImpl(pRepo) < 0) { - tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno)); - tsdbCloseFS(pRepo); - return -1; + fsheader.version = TSDB_FS_VERSION; + if (pfs->nstatus->pmf == NULL) { + ASSERT(taosArrayGetSize(pfs->nstatus->df) == 0); + fsheader.len = 0; + } else { + fsheader.len = tsdbEncodeFSHeader(NULL, pfs->nstatus) + sizeof(TSCKSUM); } - return 0; -} + // Encode header part and write + ptr = hbuf; + tsdbEncodeFSHeader(&ptr, &fsheader); + tsdbEncodeFSMeta(&ptr, &(pfs->nstatus->meta)); -void tsdbCloseFS(STsdbRepo *pRepo) { - REPO_FS(pRepo) = tsdbFreeFS(REPO_FS(pRepo)); - return 0; -} - -// Start a new FS transaction -int tsdbFSNewTxn(STsdbRepo *pRepo) { - STsdbFS *pFs = REPO_FS(pRepo); + taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE); - if (tsdbCopySnapshot(pFs->curr, pFs->new) < 0) { + if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + remove(TSDB_FS_TEMP_FNAME); return -1; } - pFs->new->version++; - - return 0; -} - -// End an existing FS transaction -int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError) { - STsdbFS *pFs = REPO_FS(pRepo); - - if (hasError) { // roll back files - - } else { // apply file change - if (tsdbSaveFSSnapshot(-1, pFs->new) < 0) { - // TODO + // Encode file status and write to file + if (fsheader.len > 0) { + if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) { + close(fd); + remove(TSDB_FS_TEMP_FNAME); + return -1; } - // rename(); - - // apply all file changes - - } - - return 0; -} + ptr = pBuf; + tsdbEncodeFSStatus(&ptr, pfs->nstatus); + taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len) -int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) { - STsdbFS *pFs = REPO_FS(pRepo); - pFs->new->mf = *pMFile; - return 0; -} - -int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { - SFSStatus *pSnapshot = REPO_FS(pRepo)->new; - SDFileSet * pOldSet; - - pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE); - if (pOldSet == NULL) { - if (taosArrayPush(pSnapshot->df, pSet) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + remove(TSDB_FS_TEMP_FNAME); + taosTZfree(pBuf); return -1; } - } else { - int index = TARRAY_ELEM_IDX(pSnapshot->df, pOldSet); - - if (pOldSet->id == pSet->id) { - taosArraySet(pSnapshot->df, index, pSet); - } else if (pOldSet->id > pSet->id) { - if (taosArrayInsert(pSnapshot->df, index, pSet) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } else { - ASSERT(0); - } } - return 0; -} - -void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { - SFSStatus *pSnapshot = REPO_FS(pRepo)->new; - while (taosArrayGetSize(pSnapshot->df) > 0) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0); - if (pSet->id < mfid) { - taosArrayRemove(pSnapshot->df, 0); - } + // fsync, close and rename + if (fsync(fd) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + remove(TSDB_FS_TEMP_FNAME); + taosTZfree(pBuf); + return -1; } -} - -SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) { - // TODO -} + (void)close(fd); + (void)rename(TSDB_FS_TEMP_FNAME, TSDB_FS_CURRENT_FNAME); + taosTZfree(pBuf); -int tsdbInitFSIter(STsdbRepo *pRepo, SFSIter *pIter) { - // TODO return 0; } -SDFileSet *tsdbFSIterNext(SFSIter *pIter) { - // TODO - return NULL; -} +static void tsdbApplyFSChangeOnDisk(SFSStatus *pFrom, SFSStatus *pTo) { + int ifrom = 0; + int ito = 0; + size_t sizeFrom, sizeTo; + SDFileSet *pSetFrom; + SDFileSet *pSetTo; -static int tsdbSaveFSSnapshot(int fd, SFSStatus *pSnapshot) { - // TODO - return 0; -} + sizeFrom = taosArrayGetSize(pFrom->df); + sizeTo = taosArrayGetSize(pTo->df); -static int tsdbLoadFSSnapshot(SFSStatus *pSnapshot) { - // TODO - return 0; -} + // Apply meta file change + tsdbApplyMFileChange(pFrom->pmf, pTo->pmf); -static int tsdbOpenFSImpl(STsdbRepo *pRepo) { - char manifest[TSDB_FILENAME_LEN] = "\0"; - - // TODO: use API here - sprintf(manifest, "%s/manifest", pRepo->rootDir); - - if (access(manifest, F_OK) == 0) { - // manifest file exist, just load - // TODO + // Apply SDFileSet change + if (ifrom >= sizeFrom) { + pSetFrom = NULL; } else { - // manifest file not exists, scan all the files and construct - // TODO + pSetFrom = taosArrayGet(pFrom->df, ifrom); } - return 0; -} - + if (ito >= sizeTo) { + pSetTo = NULL; + } else { + pSetTo = taosArrayGet(pTo->df, ito); + } -static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { - int tlen = 0; + while (true) { + if ((pSetTo == NULL) && (pSetFrom == NULL)) break; - tlen += taosEncodeVariantI64(buf, pMeta->fsversion); - tlen += taosEncodeVariantI64(buf, pMeta->version); - tlen += taosEncodeVariantI64(buf, pMeta->totalPoints); - tlen += taosEncodeVariantI64(buf, pMeta->totalStorage); + if (pSetTo == NULL || (pSetFrom && pSetFrom->fid < pSetTo->fid)) { + tsdbApplyDFileSetChange(pSetFrom, NULL); - return tlen; -} + ifrom++; + if (ifrom >= sizeFrom) { + pSetFrom = NULL; + } else { + pSetFrom = taosArrayGet(pFrom->df, ifrom); + } + } else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) { + // Do nothing + if (pSetFrom) { + ito++; + if (ito >= sizeTo) { + pSetTo = NULL; + } else { + pSetTo = taosArrayGet(pTo->df, ito); + } + } + } else { + tsdbApplyDFileSetChange(pSetFrom, pSetTo); -static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { - buf = taosDecodeVariantI64(buf, &(pMeta->fsversion)); - buf = taosDecodeVariantI64(buf, &(pMeta->version)); - buf = taosDecodeVariantI64(buf, &(pMeta->totalPoints)); - buf = taosDecodeVariantI64(buf, &(pMeta->totalStorage)); + ifrom++; + if (ifrom >= sizeFrom) { + pSetFrom = NULL; + } else { + pSetFrom = taosArrayGet(pFrom->df, ifrom); + } - return buf; + ito++; + if (ito >= sizeTo) { + pSetTo = NULL; + } else { + pSetTo = taosArrayGet(pTo->df, ito); + } + } + } } -static int tsdbEncodeFSSnapshot(void **buf, SFSStatus *pSnapshot) { - int tlen = 0; - int64_t size = 0; - - // Encode meta file - tlen += tsdbEncodeMFile(buf, &(pSnapshot->mf)); - - // Encode data files - size = taosArrayGetSize(pSnapshot->df); - tlen += taosEncodeVariantI64(buf, size); - for (size_t index = 0; index < size; index++) { - SDFile *pFile = taosArrayGet(pSnapshot->df, index); - - tlen += tsdbEncodeDFInfo(buf, &pFile); - } - +// ================== SFSIter +// ASSUMPTIONS: the FS Should be read locked when calling these functions +void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction) { + pIter->pfs = pfs; + pIter->direction = direction; - return tlen; -} + size_t size = taosArrayGetSize(pfs->cstatus->df); -static void *tsdbDecodeFSSnapshot(void *buf, SFSStatus *pSnapshot) { - int64_t size = 0; - SDFile df; + pIter->version = pfs->cstatus->meta.version; - // Decode meta file - buf = tsdbDecodeMFile(buf, &(pSnapshot->mf)); + if (size == 0) { + pIter->index = -1; + pIter->fid = TSDB_IVLD_FID; + } else { + if (direction == TSDB_FS_ITER_FORWARD) { + pIter->index = 0; + } else { + pIter->index = size - 1; + } - // Decode data files - buf = taosDecodeVariantI64(buf, &size); - for (size_t index = 0; index < size; index++) { - buf = tsdbDecodeDFInfo(buf, &df); - taosArrayPush(pSnapshot->df, (void *)(&df)); + pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid; } - - return buf; } -static SFSStatus *tsdbNewSnapshot(int32_t nfiles) { - SFSStatus *pSnapshot; - - pSnapshot = (SFSStatus *)calloc(1, sizeof(pSnapshot)); - if (pSnapshot == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } +void tsdbFSIterSeek(SFSIter *pIter, int fid) { + STsdbFS *pfs = pIter->pfs; + size_t size = taosArrayGetSize(pfs->cstatus->df); - pSnapshot->df = taosArrayInit(nfiles, sizeof(SDFileSet)); - if (pSnapshot->df == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - free(pSnapshot); - return NULL; + int flags; + if (pIter->direction == TSDB_FS_ITER_FORWARD) { + flags = TD_GE; + } else { + flags = TD_LE; } - return pSnapshot; -} - -static SFSStatus *tsdbFreeSnapshot(SFSStatus *pSnapshot) { - if (pSnapshot) { - taosArrayDestroy(pSnapshot->df); - free(pSnapshot); + void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), , flags); + if (ptr == NULL) { + pIter->index = -1; + pIter->fid = TSDB_IVLD_FID; + } else { + pIter->index = TARRAY_ELEM_IDX(pfs->cstatus->df, ptr); + pIter->fid = ((SDFileSet *)ptr)->fid; } - - return NULL; } -static STsdbFS *tsdbNewFS(int32_t keep, int32_t days) { - STsdbFS *pFs; - int code; - int32_t nfiles; - - pFs = (STsdbFS *)calloc(1, sizeof(*pFs)); - if (pFs == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - code = pthread_rwlock_init(&(pFs->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - free(pFs); - return NULL; - } +SDFileSet *tsdbFSIterNext(SFSIter *pIter) { + STsdbFS * pfs = pIter->pfs; + SDFileSet *pSet; - nfiles = TSDB_MAX_DFILES(keep, days); - if (((pFs->curr = tsdbNewSnapshot(nfiles)) == NULL) || ((pFs->new = tsdbNewSnapshot(nfiles)) == NULL)) { - tsdbFreeFS(pFs); + if (pIter->index < 0) { + ASSERT(pIter->fid == TSDB_IVLD_FID); return NULL; } - return pFs; -} + ASSERT(pIter->fid != TSDB_IVLD_FID); -static STsdbFS *tsdbFreeFS(STsdbFS *pFs) { - if (pFs) { - pFs->new = tsdbFreeSnapshot(pFs->new); - pFs->curr = tsdbFreeSnapshot(pFs->curr); - pthread_rwlock_destroy(&(pFs->lock)); - free(pFs); + if (pIter->version != pfs->cstatus->meta.version) { + tsdbFSIterSeek(pIter, pIter->fid); } - return NULL; -} - -static int tsdbCopySnapshot(SFSStatus *src, SFSStatus *dst) { - dst->meta = src->meta; - dst->mf = src->meta; - taosArrayCopy(dst->df, src->df); - return 0; -} - -static int tsdbCompFSetId(const void *key1, const void *key2) { - int id = *(int *)key1; - SDFileSet *pSet = (SDFileSet *)key2; - - if (id < pSet->id) { - return -1; - } else if (id == pSet->id) { - return 0; - } else { - return 1; + if (pIter->index < 0) { + return NULL; } -} - -static SDFileSet *tsdbSearchDFileSet(SFSStatus *pSnapshot, int fid, int flags) { - void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags); - return (ptr == NULL) ? NULL : ((SDFileSet *)ptr); -} -static int tsdbMakeFSChange(STsdbRepo *pRepo) { - tsdbMakeFSMFileChange(pRepo); - tsdbMakeFSDFileChange(pRepo); - return 0; -} + pSet = (SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index); + ASSERT(pSet->fid == pIter->fid); -static int tsdbMakeFSMFileChange(STsdbRepo *pRepo) { - STsdbFS *pFs = REPO_FS(pRepo); - SMFile * pDstMFile = &(pFs->curr->mf); - SMFile * pSrcMFile = &(pFs->new->mf); - - if (tfsIsSameFile(&(pDstMFile->f), &(pSrcMFile->f))) { // the same file - if (pDstMFile->info != pSrcMFile->info) { - if (pDstMFile->info.size > pDstMFile->info.size) { - // Commit succeed, do nothing - } else if (pDstMFile->info.size < pDstMFile->info.size) { - // Commit failed, back - // TODO - } else { - ASSERT(0); - } + if (pIter->direction == TSDB_FS_ITER_FORWARD) { + pIter->index++; + if (pIter->index >= taosArrayGetSize(pfs->cstatus->df)) { + pIter->index = -1; } } else { - tfsremove(&(pSrcMFile->f)); + pIter->index--; } - return 0; -} - -static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) { - STsdbFS * pFs = REPO_FS(pRepo); - int cidx = 0; - int nidx = 0; - SDFileSet *pCSet = NULL; - SDFileSet *pNSet = NULL; - - if (cidx < taosArrayGetSize(pFs->curr->df)) { - pCSet = taosArrayGet(pFs->curr->df, cidx); - } else { - pCSet = NULL; - } - - if (nidx < taosArrayGetSize(pFs->new->df)) { - pNSet = taosArrayGet(pFs->new->df, nidx); + if (pIter->index > 0) { + pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid; } else { - pNSet = NULL; - } - - while (true) { - if (pCSet == NULL && pNSet == NULL) break; - - if (pCSet == NULL || (pNSet != NULL && pCSet->id > pNSet->id)) { - tsdbRemoveDFileSet(pNSet); - - nidx++; - if (nidx < taosArrayGetSize(pFs->new->df)) { - pNSet = taosArrayGet(pFs->new->df, nidx); - } else { - pNSet = NULL; - } - } else if (pNSet == NULL || (pCSet != NULL && pCSet->id < pNSet->id)) { - cidx++; - if (cidx < taosArrayGetSize(pFs->curr->df)) { - pCSet = taosArrayGet(pFs->curr->df, cidx); - } else { - pCSet = NULL; - } - } else { - // TODO: apply dfileset change - nidx++; - if (nidx < taosArrayGetSize(pFs->new->df)) { - pNSet = taosArrayGet(pFs->new->df, nidx); - } else { - pNSet = NULL; - } - - cidx++; - if (cidx < taosArrayGetSize(pFs->curr->df)) { - pCSet = taosArrayGet(pFs->curr->df, cidx); - } else { - pCSet = NULL; - } - } + pIter->fid = TSDB_IVLD_FID; } - return 0; -} -#endif \ No newline at end of file + return pSet; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 799eeb437b6e3b390852309e7baad6bc1a567082..53b36448c444126813a7ea4a858c93828a9547ac 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -58,6 +58,48 @@ void *tsdbDecodeSMFile(void *buf, SMFile *pMFile) { return buf; } +int tsdbApplyMFileChange(SMFile *from, SMFile *to) { + ASSERT(from != NULL || to != NULL); + + if (from != NULL) { + if (to == NULL) { + tsdbRemoveMFile(from); + } else { + if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) { + if (from->info.size > to->info.size) { + tsdbRollbackMFile(to); + } + } else { + tsdbRemoveMFile(from); + } + } + } + + return 0; +} + +static int tsdbRollBackMFile(const SMFile *pMFile) { + SMFile mf = *pMFile; + + if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { + return -1; + } + + if (taosFtruncate(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseMFile(&mf); + return -1; + } + + if (tsdbUpdateMFileHeader(&mf) < 0) { + tsdbCloseMFile(&mf); + return -1; + } + + tsdbCloseMFile(&mf); + return 0; +} + int tsdbCreateMFile(SMFile *pMFile) { ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC); @@ -220,6 +262,48 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { return buf; } +static int tsdbApplyDFileChange(SDFile *from, SDFile *to) { + ASSERT(from != NULL || to != NULL); + + if (from != NULL) { + if (to == NULL) { + tsdbRemoveDFile(from); + } else { + if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) { + if (from->info.size > to->info.size) { + tsdbRollbackDFile(to); + } + } else { + tsdbRemoveDFile(from); + } + } + } + + return 0; +} + +static int tsdbRollBackDFile(const SDFile *pDFile) { + SDFile df = *pDFile; + + if (tsdbOpenDFile(&df, O_WRONLY) < 0) { + return -1; + } + + if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseDFile(&df); + return -1; + } + + if (tsdbUpdateDFileHeader(&df) < 0) { + tsdbCloseDFile(&df); + return -1; + } + + tsdbCloseDFile(&df); + return 0; +} + // ============== Operations on SDFileSet void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) { pSet->fid = fid; @@ -254,6 +338,16 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) { return buf; } +int tsdbApplyDFileSetChange(const SDFileSet *from, const SDFileSet *to) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (tsdbApplyDFileChange(TSDB_DFILE_IN_SET(from, ftype), TSDB_DFILE_IN_SET(to, ftype)) < 0) { + return -1; + } + } + + return 0; +} + int tsdbCreateDFileSet(SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {