提交 ef14a164 编写于 作者: H Hongze Cheng

more progress

上级 e7cf98a2
......@@ -20,49 +20,62 @@
extern "C" {
#endif
#define TSDB_FS_VERSION 0
// ================== CURRENT file header info
typedef struct {
uint32_t version; // Current file version
uint32_t len;
} SFSHeader;
// ================== TSDB File System Meta
typedef struct {
int64_t fsversion; // file system version, related to program
int64_t version;
int64_t totalPoints;
int64_t totalStorage;
uint64_t version; // Commit version from 0 to increase
int64_t totalPoints; // total points
int64_t totalStorage; // Uncompressed total storage
} STsdbFSMeta;
// ==================
typedef struct {
int64_t version;
STsdbFSMeta meta;
SMFile mf; // meta file
SArray* df; // data file array
} SFSVer;
STsdbFSMeta meta; // FS meta
SMFile mf; // meta file
SArray* df; // data file array
} SFSStatus;
typedef struct {
pthread_rwlock_t lock;
SFSVer fsv;
SFSStatus* cstatus; // current stage
SHashObj* metaCache; // meta
bool intxn;
SFSStatus* nstatus;
SList* metaDelta;
} STsdbFS;
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn
typedef struct {
int version; // current FS version
uint64_t version; // current FS version
int index;
int fid;
SDFileSet* pSet;
} SFSIter;
#if 0
int tsdbOpenFS(STsdbRepo* pRepo);
void tsdbCloseFS(STsdbRepo* pRepo);
int tsdbFSNewTxn(STsdbRepo* pRepo);
int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
int tsdbRemoveDFileSet(SDFileSet* pSet);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter);
SDFileSet* tsdbFSIterNext(SFSIter* pIter);
int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet);
#endif
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
......@@ -71,7 +84,7 @@ static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
return 0;
}
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
static FORCE_INLINE int tsdbWLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_wrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
......@@ -80,7 +93,7 @@ static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
return 0;
}
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_unlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
......
......@@ -31,6 +31,7 @@
#include "tchecksum.h"
#include "tskiplist.h"
#include "tdataformat.h"
#include "tcoding.h"
#include "tscompression.h"
#include "tlockfree.h"
#include "tlist.h"
......@@ -85,6 +86,7 @@ struct STsdbRepo {
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define REPO_FS(r) ((r)->fs)
#define REPO_FS_VERSION(r) // TODO
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
......
......@@ -15,9 +15,236 @@
#include "tsdbint.h"
#define REPO_FS(r) ((r)->fs)
#define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3)
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pHeader->version);
tlen += taosEncodeFixedU32(buf, pHeader->len);
return tlen;
}
static void *tsdbEncodeFSHeader(void *buf, SFSHeader *pHeader) {
buf = taosEncodeFixedU32(buf, &(pHeader->version));
buf = taosEncodeFixedU32(buf, &(pHeader->len));
return buf;
}
// ================== STsdbFSMeta
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pMeta->version);
tlen += taosEncodeFixedI64(buf, pMeta->totalPoints);
tlen += taosEncodeFixedI64(buf, pMeta->totalStorage);
return tlen;
}
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
buf = taosDecodeFixedU64(buf, &(pMeta->version));
buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));
return buf;
}
// ================== SFSStatus
static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
int tlen = 0;
uint64_t nset = taosArrayGetSize(pArray);
tlen += taosEncodeFixedU64(buf, nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet *pSet = taosArrayGet(pArray, i);
tlen += tsdbEncodeDFileSet(buf, pSet);
}
return tlen;
}
static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
uint64_t nset;
SDFileSet dset;
taosArrayClear(pArray);
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet *pSet = taosArrayGet(pArray, i);
buf = tsdbDecodeDFileSet(buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
return buf;
}
static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
int tlen = 0;
tlen += tsdbEncodeFSMeta(buf, &(pStatus->meta));
tlen += tsdbEncodeSMFile(buf, &(pStatus->mf));
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
return tlen;
}
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
buf = taosDecodeFixedU32(buf, pStatus->fsVer);
buf = tsdbDecodeFSMeta(buf, &(pStatus->meta));
buf = tsdbDecodeSMFile(buf, &(pStatus->mf));
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
return buf;
}
static SFSStatus *tsdbNewFSStatus(int maxFSet) {
SFSStatus *pStatus = (SFSStatus *)calloc(1, sizeof(*pStatus));
if (pStatus == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
if (pStatus->df) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pStatus);
return NULL;
}
return pStatus;
}
static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) {
if (pStatus) {
pStatus->df = taosArrayDestroy(pStatus->df);
free(pStatus);
}
return NULL;
}
static void tsdbResetFSStatus(SFSStatus *pStatus) {
if (pStatus == NULL) {
return;
}
taosArrayClear(pStatus->df);
}
// ================== STsdbFS
STsdbFS *tsdbNewFS(int maxFSet) {
STsdbFS *pFs = (STsdbFS *)calloc(1, sizeof(*pFs));
if (pFs == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
int code = pthread_rwlock_init(&(pFs->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
free(pFs);
return NULL;
}
pFs->cstatus = tsdbNewFSStatus(maxFSet);
if (pFs->cstatus == NULL) {
tsdbFreeFS(pFs);
return NULL;
}
pFs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pFs->metaCache == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeFS(pFs);
return NULL;
}
pFs->nstatus = tsdbNewFSStatus(maxFSet);
if (pFs->nstatus == NULL) {
tsdbFreeFS(pFs);
return NULL;
}
pFs->metaDelta = tdListNew(sizeof(SKVRecord));
if (pFs->metaDelta == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeFS(pFs);
return NULL;
}
return NULL;
}
void *tsdbFreeFS(STsdbFS *pFs) {
if (pFs) {
pFs->metaDelta = tdListFree(pFs->metaDelta);
pFs->nstatus = tsdbFreeFSStatus(pFs->nstatus);
taosHashCleanup(pFs->metaCache);
pFs->metaCache = NULL;
pFs->cstatus = tsdbFreeFSStatus(pFs->cstatus);
pthread_rwlock_destroy(&(pFs->lock));
}
return NULL;
}
int tsdbOpenFS(STsdbFS *pFs, int keep, int days) {
// TODO
return 0;
}
void tsdbCloseFS(STsdbFS *pFs) {
// TODO
}
int tsdbStartTxn(STsdbFS *pFs) {
tsdbResetFSStatus(pFs->nstatus);
tdListEmpty(pFs->metaDelta);
return 0;
}
int tsdbEndTxn(STsdbFS *pFs, bool hasError) {
SFSStatus *pTStatus;
if (hasError) {
// TODO
} else {
// TODO 1. Create and open a new file current.t
// TODO 2. write new status to new file and fysnc and close
// TODO 3. rename current.t to current
// TODO 4. apply change to file
tsdbWLockFS(pFs);
pTStatus = pFs->cstatus;
pFs->cstatus = pFs->nstatus;
pFs->nstatus = pTStatus;
tsdbUnLockFS(pFs);
// TODO 5: apply meta change to cache
}
return 0;
}
// ================== SFSIter
void tsdbFSIterInit(STsdbFS *pFs, SFSIter *pIter) {
// TODO
}
SDFileSet *tsdbFSIterNext(STsdbFS *pFs) {
// TODO
return NULL;
}
#if 0
int tsdbOpenFS(STsdbRepo *pRepo) {
ASSERT(REPO_FS == NULL);
......@@ -85,7 +312,7 @@ int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) {
}
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
SFSVer *pSnapshot = REPO_FS(pRepo)->new;
SFSStatus *pSnapshot = REPO_FS(pRepo)->new;
SDFileSet * pOldSet;
pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE);
......@@ -113,7 +340,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
}
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
SFSVer *pSnapshot = REPO_FS(pRepo)->new;
SFSStatus *pSnapshot = REPO_FS(pRepo)->new;
while (taosArrayGetSize(pSnapshot->df) > 0) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0);
if (pSet->id < mfid) {
......@@ -137,17 +364,12 @@ SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
return NULL;
}
int tsdbCreateDFileSet(int fid, int level, SDFileSet *pSet) {
// TODO
return 0;
}
static int tsdbSaveFSSnapshot(int fd, SFSVer *pSnapshot) {
static int tsdbSaveFSSnapshot(int fd, SFSStatus *pSnapshot) {
// TODO
return 0;
}
static int tsdbLoadFSSnapshot(SFSVer *pSnapshot) {
static int tsdbLoadFSSnapshot(SFSStatus *pSnapshot) {
// TODO
return 0;
}
......@@ -190,7 +412,7 @@ static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
return buf;
}
static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) {
static int tsdbEncodeFSSnapshot(void **buf, SFSStatus *pSnapshot) {
int tlen = 0;
int64_t size = 0;
......@@ -210,7 +432,7 @@ static int tsdbEncodeFSSnapshot(void **buf, SFSVer *pSnapshot) {
return tlen;
}
static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) {
static void *tsdbDecodeFSSnapshot(void *buf, SFSStatus *pSnapshot) {
int64_t size = 0;
SDFile df;
......@@ -227,10 +449,10 @@ static void *tsdbDecodeFSSnapshot(void *buf, SFSVer *pSnapshot) {
return buf;
}
static SFSVer *tsdbNewSnapshot(int32_t nfiles) {
SFSVer *pSnapshot;
static SFSStatus *tsdbNewSnapshot(int32_t nfiles) {
SFSStatus *pSnapshot;
pSnapshot = (SFSVer *)calloc(1, sizeof(pSnapshot));
pSnapshot = (SFSStatus *)calloc(1, sizeof(pSnapshot));
if (pSnapshot == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
......@@ -246,7 +468,7 @@ static SFSVer *tsdbNewSnapshot(int32_t nfiles) {
return pSnapshot;
}
static SFSVer *tsdbFreeSnapshot(SFSVer *pSnapshot) {
static SFSStatus *tsdbFreeSnapshot(SFSStatus *pSnapshot) {
if (pSnapshot) {
taosArrayDestroy(pSnapshot->df);
free(pSnapshot);
......@@ -293,7 +515,7 @@ static STsdbFS *tsdbFreeFS(STsdbFS *pFs) {
return NULL;
}
static int tsdbCopySnapshot(SFSVer *src, SFSVer *dst) {
static int tsdbCopySnapshot(SFSStatus *src, SFSStatus *dst) {
dst->meta = src->meta;
dst->mf = src->meta;
taosArrayCopy(dst->df, src->df);
......@@ -313,7 +535,7 @@ static int tsdbCompFSetId(const void *key1, const void *key2) {
}
}
static SDFileSet *tsdbSearchDFileSet(SFSVer *pSnapshot, int fid, int flags) {
static SDFileSet *tsdbSearchDFileSet(SFSStatus *pSnapshot, int fid, int flags) {
void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags);
return (ptr == NULL) ? NULL : ((SDFileSet *)ptr);
}
......@@ -404,4 +626,5 @@ static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) {
}
return 0;
}
\ No newline at end of file
}
#endif
\ No newline at end of file
......@@ -364,6 +364,15 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
return POINTER_SHIFT(buf, size);
}
// ------ blank
static FORCE_INLINE int taosEncodeBlank(void **buf, int nblank) {
// TODO
}
static FORCE_INLINE void *taosDecodeBlank(void *buf) {
// TODO
}
#ifdef __cplusplus
}
#endif
......
......@@ -47,7 +47,7 @@ typedef struct {
#define listNodeFree(n) free(n);
SList * tdListNew(int eleSize);
void tdListFree(SList *list);
void * tdListFree(SList *list);
void tdListEmpty(SList *list);
void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node);
......
......@@ -38,11 +38,13 @@ void tdListEmpty(SList *list) {
list->numOfEles = 0;
}
void tdListFree(SList *list) {
void *tdListFree(SList *list) {
if (list) {
tdListEmpty(list);
free(list);
}
return NULL;
}
void tdListPrependNode(SList *list, SListNode *node) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册