diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 75567ef8c0d6da7bd062e9083db22a293ad2ef8e..281ee7dc682858a1f84e3affc3fb6457b090dc78 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -20,49 +20,62 @@ extern "C" { #endif +#define TSDB_FS_VERSION 0 + +// ================== CURRENT file header info +typedef struct { + uint32_t version; // Current file version + uint32_t len; +} SFSHeader; + +// ================== TSDB File System Meta typedef struct { - int64_t fsversion; // file system version, related to program - int64_t version; - int64_t totalPoints; - int64_t totalStorage; + uint64_t version; // Commit version from 0 to increase + int64_t totalPoints; // total points + int64_t totalStorage; // Uncompressed total storage } STsdbFSMeta; +// ================== typedef struct { - int64_t version; - STsdbFSMeta meta; - SMFile mf; // meta file - SArray* df; // data file array -} SFSVer; + STsdbFSMeta meta; // FS meta + SMFile mf; // meta file + SArray* df; // data file array +} SFSStatus; typedef struct { pthread_rwlock_t lock; - SFSVer fsv; + SFSStatus* cstatus; // current stage + SHashObj* metaCache; // meta + + bool intxn; + SFSStatus* nstatus; + SList* metaDelta; } STsdbFS; +#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) +#define FS_NEW_STATUS(pfs) ((pfs)->nstatus) +#define FS_IN_TXN(pfs) (pfs)->intxn + typedef struct { - int version; // current FS version + uint64_t version; // current FS version int index; int fid; SDFileSet* pSet; } SFSIter; +#if 0 int tsdbOpenFS(STsdbRepo* pRepo); void tsdbCloseFS(STsdbRepo* pRepo); int tsdbFSNewTxn(STsdbRepo* pRepo); int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); -void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); -int tsdbRemoveDFileSet(SDFileSet* pSet); -int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); -void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); -SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); SDFileSet* tsdbFSIterNext(SFSIter* pIter); -int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet); +#endif -static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { +static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -71,7 +84,7 @@ static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { return 0; } -static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { +static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) { int code = pthread_rwlock_wrlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); @@ -80,7 +93,7 @@ static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { return 0; } -static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { +static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) { int code = pthread_rwlock_unlock(&(pFs->lock)); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 48cb09cf80892d3755451897e50b0279d38a17a1..80360e89e1bcc52c6d74e3326430e1918ff3eee5 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -31,6 +31,7 @@ #include "tchecksum.h" #include "tskiplist.h" #include "tdataformat.h" +#include "tcoding.h" #include "tscompression.h" #include "tlockfree.h" #include "tlist.h" @@ -85,6 +86,7 @@ struct STsdbRepo { #define REPO_ID(r) (r)->config.tsdbId #define REPO_CFG(r) (&((r)->config)) +#define REPO_FS(r) ((r)->fs) #define REPO_FS_VERSION(r) // TODO #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 27ae452e545c10e8f47d4e8b03251222b6576223..7bdbe35c38517075eba2009f9837002e94cdd7b9 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -15,9 +15,236 @@ #include "tsdbint.h" -#define REPO_FS(r) ((r)->fs) -#define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3) +#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3) +// ================== CURRENT file header info +static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { + int tlen = 0; + + tlen += taosEncodeFixedU32(buf, pHeader->version); + tlen += taosEncodeFixedU32(buf, pHeader->len); + + return tlen; +} + +static void *tsdbEncodeFSHeader(void *buf, SFSHeader *pHeader) { + buf = taosEncodeFixedU32(buf, &(pHeader->version)); + buf = taosEncodeFixedU32(buf, &(pHeader->len)); + + return buf; +} + +// ================== STsdbFSMeta +static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pMeta->version); + tlen += taosEncodeFixedI64(buf, pMeta->totalPoints); + tlen += taosEncodeFixedI64(buf, pMeta->totalStorage); + + return tlen; +} + +static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { + buf = taosDecodeFixedU64(buf, &(pMeta->version)); + buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints)); + buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage)); + + return buf; +} + +// ================== SFSStatus +static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { + int tlen = 0; + uint64_t nset = taosArrayGetSize(pArray); + + tlen += taosEncodeFixedU64(buf, nset); + for (size_t i = 0; i < nset; i++) { + SDFileSet *pSet = taosArrayGet(pArray, i); + + tlen += tsdbEncodeDFileSet(buf, pSet); + } + + return tlen; +} + +static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) { + uint64_t nset; + SDFileSet dset; + + taosArrayClear(pArray); + + buf = taosDecodeFixedU64(buf, &nset); + for (size_t i = 0; i < nset; i++) { + SDFileSet *pSet = taosArrayGet(pArray, i); + + buf = tsdbDecodeDFileSet(buf, &dset); + taosArrayPush(pArray, (void *)(&dset)); + } + return buf; +} + +static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { + int tlen = 0; + + tlen += tsdbEncodeFSMeta(buf, &(pStatus->meta)); + tlen += tsdbEncodeSMFile(buf, &(pStatus->mf)); + tlen += tsdbEncodeDFileSetArray(buf, pStatus->df); + + return tlen; +} + +static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { + buf = taosDecodeFixedU32(buf, pStatus->fsVer); + buf = tsdbDecodeFSMeta(buf, &(pStatus->meta)); + buf = tsdbDecodeSMFile(buf, &(pStatus->mf)); + buf = tsdbDecodeDFileSetArray(buf, pStatus->df); + + return buf; +} + +static SFSStatus *tsdbNewFSStatus(int maxFSet) { + SFSStatus *pStatus = (SFSStatus *)calloc(1, sizeof(*pStatus)); + if (pStatus == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); + if (pStatus->df) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + free(pStatus); + return NULL; + } + + return pStatus; +} + +static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) { + if (pStatus) { + pStatus->df = taosArrayDestroy(pStatus->df); + free(pStatus); + } + + return NULL; +} + +static void tsdbResetFSStatus(SFSStatus *pStatus) { + if (pStatus == NULL) { + return; + } + + taosArrayClear(pStatus->df); +} + +// ================== STsdbFS +STsdbFS *tsdbNewFS(int maxFSet) { + STsdbFS *pFs = (STsdbFS *)calloc(1, sizeof(*pFs)); + if (pFs == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + int code = pthread_rwlock_init(&(pFs->lock), NULL); + if (code) { + terrno = TAOS_SYSTEM_ERROR(code); + free(pFs); + return NULL; + } + + pFs->cstatus = tsdbNewFSStatus(maxFSet); + if (pFs->cstatus == NULL) { + tsdbFreeFS(pFs); + return NULL; + } + + pFs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pFs->metaCache == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbFreeFS(pFs); + return NULL; + } + + pFs->nstatus = tsdbNewFSStatus(maxFSet); + if (pFs->nstatus == NULL) { + tsdbFreeFS(pFs); + return NULL; + } + + pFs->metaDelta = tdListNew(sizeof(SKVRecord)); + if (pFs->metaDelta == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbFreeFS(pFs); + return NULL; + } + + return NULL; +} + +void *tsdbFreeFS(STsdbFS *pFs) { + if (pFs) { + pFs->metaDelta = tdListFree(pFs->metaDelta); + pFs->nstatus = tsdbFreeFSStatus(pFs->nstatus); + taosHashCleanup(pFs->metaCache); + pFs->metaCache = NULL; + pFs->cstatus = tsdbFreeFSStatus(pFs->cstatus); + pthread_rwlock_destroy(&(pFs->lock)); + } + return NULL; +} + +int tsdbOpenFS(STsdbFS *pFs, int keep, int days) { + // TODO + + return 0; +} + +void tsdbCloseFS(STsdbFS *pFs) { + // TODO +} + +int tsdbStartTxn(STsdbFS *pFs) { + tsdbResetFSStatus(pFs->nstatus); + tdListEmpty(pFs->metaDelta); + return 0; +} + +int tsdbEndTxn(STsdbFS *pFs, bool hasError) { + SFSStatus *pTStatus; + + if (hasError) { + // TODO + } else { + // TODO 1. Create and open a new file current.t + + // TODO 2. write new status to new file and fysnc and close + + // TODO 3. rename current.t to current + + // TODO 4. apply change to file + tsdbWLockFS(pFs); + pTStatus = pFs->cstatus; + pFs->cstatus = pFs->nstatus; + pFs->nstatus = pTStatus; + tsdbUnLockFS(pFs); + + // TODO 5: apply meta change to cache + } + + return 0; +} + +// ================== SFSIter +void tsdbFSIterInit(STsdbFS *pFs, SFSIter *pIter) { + // TODO +} + +SDFileSet *tsdbFSIterNext(STsdbFS *pFs) { + // TODO + return NULL; +} + +#if 0 int tsdbOpenFS(STsdbRepo *pRepo) { ASSERT(REPO_FS == NULL); @@ -85,7 +312,7 @@ int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) { } int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { - SFSVer *pSnapshot = REPO_FS(pRepo)->new; + SFSStatus *pSnapshot = REPO_FS(pRepo)->new; SDFileSet * pOldSet; pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE); @@ -113,7 +340,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { } void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { - SFSVer *pSnapshot = REPO_FS(pRepo)->new; + SFSStatus *pSnapshot = REPO_FS(pRepo)->new; while (taosArrayGetSize(pSnapshot->df) > 0) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0); if (pSet->id < mfid) { @@ -137,17 +364,12 @@ SDFileSet *tsdbFSIterNext(SFSIter *pIter) { return NULL; } -int tsdbCreateDFileSet(int fid, int level, SDFileSet *pSet) { - // TODO - return 0; -} - -static int tsdbSaveFSSnapshot(int fd, SFSVer *pSnapshot) { +static int tsdbSaveFSSnapshot(int fd, SFSStatus *pSnapshot) { // TODO return 0; } -static int tsdbLoadFSSnapshot(SFSVer *pSnapshot) { +static int tsdbLoadFSSnapshot(SFSStatus *pSnapshot) { // TODO return 0; } @@ -190,7 +412,7 @@ static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) { return buf; } -static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) { +static int tsdbEncodeFSSnapshot(void **buf, SFSStatus *pSnapshot) { int tlen = 0; int64_t size = 0; @@ -210,7 +432,7 @@ static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) { return tlen; } -static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) { +static void *tsdbDecodeFSSnapshot(void *buf, SFSStatus *pSnapshot) { int64_t size = 0; SDFile df; @@ -227,10 +449,10 @@ static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) { return buf; } -static SFSVer *tsdbNewSnapshot(int32_t nfiles) { - SFSVer *pSnapshot; +static SFSStatus *tsdbNewSnapshot(int32_t nfiles) { + SFSStatus *pSnapshot; - pSnapshot = (SFSVer *)calloc(1, sizeof(pSnapshot)); + pSnapshot = (SFSStatus *)calloc(1, sizeof(pSnapshot)); if (pSnapshot == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; @@ -246,7 +468,7 @@ static SFSVer *tsdbNewSnapshot(int32_t nfiles) { return pSnapshot; } -static SFSVer *tsdbFreeSnapshot(SFSVer *pSnapshot) { +static SFSStatus *tsdbFreeSnapshot(SFSStatus *pSnapshot) { if (pSnapshot) { taosArrayDestroy(pSnapshot->df); free(pSnapshot); @@ -293,7 +515,7 @@ static STsdbFS *tsdbFreeFS(STsdbFS *pFs) { return NULL; } -static int tsdbCopySnapshot(SFSVer *src, SFSVer *dst) { +static int tsdbCopySnapshot(SFSStatus *src, SFSStatus *dst) { dst->meta = src->meta; dst->mf = src->meta; taosArrayCopy(dst->df, src->df); @@ -313,7 +535,7 @@ static int tsdbCompFSetId(const void *key1, const void *key2) { } } -static SDFileSet *tsdbSearchDFileSet(SFSVer *pSnapshot, int fid, int flags) { +static SDFileSet *tsdbSearchDFileSet(SFSStatus *pSnapshot, int fid, int flags) { void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags); return (ptr == NULL) ? NULL : ((SDFileSet *)ptr); } @@ -404,4 +626,5 @@ static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) { } return 0; -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/src/util/inc/tcoding.h b/src/util/inc/tcoding.h index ff34c1560703a820b296911065c42e224fefcc03..636f9c5e50de40e3b311833515c5628de742c6a2 100644 --- a/src/util/inc/tcoding.h +++ b/src/util/inc/tcoding.h @@ -364,6 +364,15 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { return POINTER_SHIFT(buf, size); } +// ------ blank +static FORCE_INLINE int taosEncodeBlank(void **buf, int nblank) { + // TODO +} + +static FORCE_INLINE void *taosDecodeBlank(void *buf) { + // TODO +} + #ifdef __cplusplus } #endif diff --git a/src/util/inc/tlist.h b/src/util/inc/tlist.h index e8380294da47de1c9eb65059263ac820f3d816be..6c96ec0b138cfc229ab4a8bde3f3b374bce49dfd 100644 --- a/src/util/inc/tlist.h +++ b/src/util/inc/tlist.h @@ -47,7 +47,7 @@ typedef struct { #define listNodeFree(n) free(n); SList * tdListNew(int eleSize); -void tdListFree(SList *list); +void * tdListFree(SList *list); void tdListEmpty(SList *list); void tdListPrependNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node); diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index 8c2ad83de117aaf528565a711a2aa3732984e0a9..2f52551e2ac7c79ef54cd4546be7bf844ff04980 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -38,11 +38,13 @@ void tdListEmpty(SList *list) { list->numOfEles = 0; } -void tdListFree(SList *list) { +void *tdListFree(SList *list) { if (list) { tdListEmpty(list); free(list); } + + return NULL; } void tdListPrependNode(SList *list, SListNode *node) {