提交 d16ee878 编写于 作者: H Hongze Cheng

finish fs

上级 607961b4
......@@ -25,7 +25,7 @@ extern "C" {
// ================== CURRENT file header info
typedef struct {
uint32_t version; // Current file version
uint32_t len;
uint32_t len; // Encode content length (including checksum)
} SFSHeader;
// ================== TSDB File System Meta
......@@ -38,6 +38,7 @@ typedef struct {
// ==================
typedef struct {
STsdbFSMeta meta; // FS meta
SMFile* pmf; // meta file pointer
SMFile mf; // meta file
SArray* df; // data file array
} SFSStatus;
......@@ -45,12 +46,10 @@ typedef struct {
typedef struct {
pthread_rwlock_t lock;
SFSStatus* cstatus; // current stage
SHashObj* metaCache; // meta
SFSStatus* cstatus; // current status
SHashObj* metaCache; // meta cache
bool intxn;
SFSStatus* nstatus;
SList* metaDelta;
SFSStatus* nstatus; // new status
} STsdbFS;
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
......@@ -58,12 +57,17 @@ typedef struct {
#define FS_IN_TXN(pfs) (pfs)->intxn
typedef struct {
int direction;
uint64_t version; // current FS version
int index;
int fid;
STsdbFS* pfs;
int index; // used to position next fset when version the same
int fid; // used to seek when version is changed
SDFileSet* pSet;
} SFSIter;
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
#if 0
int tsdbOpenFS(STsdbRepo* pRepo);
void tsdbCloseFS(STsdbRepo* pRepo);
......
......@@ -23,12 +23,14 @@ extern "C" {
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_IVLD_FID INT_MIN
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_FD(tf) ((tf)->fd)
#define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf))
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
......@@ -61,6 +63,7 @@ void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, int ver);
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(const SMFile* from, const SMFile* to);
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pMFile));
......@@ -288,6 +291,7 @@ void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, int ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(const SDFileSet* from, const SDFileSet* to);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
......
......@@ -14,7 +14,6 @@
*/
#include "tsdbint.h"
#define TSDB_IVLD_FID INT_MIN
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days))
......
......@@ -15,6 +15,8 @@
#include "tsdbint.h"
#define TSDB_FS_CURRENT_FNAME "current"
#define TSDB_FS_TEMP_FNAME "current.t"
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
// ================== CURRENT file header info
......@@ -27,9 +29,9 @@ static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
return tlen;
}
static void *tsdbEncodeFSHeader(void *buf, SFSHeader *pHeader) {
buf = taosEncodeFixedU32(buf, &(pHeader->version));
buf = taosEncodeFixedU32(buf, &(pHeader->len));
static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) {
buf = taosDecodeFixedU32(buf, &(pHeader->version));
buf = taosDecodeFixedU32(buf, &(pHeader->len));
return buf;
}
......@@ -76,8 +78,6 @@ static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet *pSet = taosArrayGet(pArray, i);
buf = tsdbDecodeDFileSet(buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
......@@ -85,19 +85,22 @@ static int tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
}
static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
ASSERT(pStatus->pmf);
int tlen = 0;
tlen += tsdbEncodeFSMeta(buf, &(pStatus->meta));
tlen += tsdbEncodeSMFile(buf, &(pStatus->mf));
tlen += tsdbEncodeSMFile(buf, &(pStatus->pmf));
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
return tlen;
}
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
buf = taosDecodeFixedU32(buf, pStatus->fsVer);
buf = tsdbDecodeFSMeta(buf, &(pStatus->meta));
buf = tsdbDecodeSMFile(buf, &(pStatus->mf));
tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf);
buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
return buf;
......@@ -111,7 +114,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) {
}
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
if (pStatus->df) {
if (pStatus->df == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pStatus);
return NULL;
......@@ -134,62 +137,79 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) {
return;
}
pStatus->pmf = NULL;
taosArrayClear(pStatus->df);
}
static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) {
ASSERT(pStatus->pmf == NULL && TSDB_FILE_CLOSED(pMFile));
pStatus->pmf = &(pStatus->mf);
*(pStatus->pmf) = *pMFile;
}
static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
ASSERT(TSDB_FILE_CLOSED(&(pSet->files[0])));
ASSERT(TSDB_FILE_CLOSED(&(pSet->files[1])));
ASSERT(TSDB_FILE_CLOSED(&(pSet->files[2])));
if (taosArrayPush(pStatus->df, (void *)pStatus) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
// ================== STsdbFS
STsdbFS *tsdbNewFS(int maxFSet) {
STsdbFS *pFs = (STsdbFS *)calloc(1, sizeof(*pFs));
if (pFs == NULL) {
STsdbFS *tsdbNewFS(int keep, int days) {
int maxFSet = TSDB_MAX_FSETS(keep, days);
STsdbFS *pfs;
pfs = (STsdbFS *)calloc(1, sizeof(*pfs));
if (pfs == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
int code = pthread_rwlock_init(&(pFs->lock), NULL);
int code = pthread_rwlock_init(&(pfs->lock), NULL);
if (code) {
terrno = TAOS_SYSTEM_ERROR(code);
free(pFs);
free(pfs);
return NULL;
}
pFs->cstatus = tsdbNewFSStatus(maxFSet);
if (pFs->cstatus == NULL) {
tsdbFreeFS(pFs);
pfs->cstatus = tsdbNewFSStatus(maxFSet);
if (pfs->cstatus == NULL) {
tsdbFreeFS(pfs);
return NULL;
}
pFs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pFs->metaCache == NULL) {
pfs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pfs->metaCache == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeFS(pFs);
tsdbFreeFS(pfs);
return NULL;
}
pFs->nstatus = tsdbNewFSStatus(maxFSet);
if (pFs->nstatus == NULL) {
tsdbFreeFS(pFs);
pfs->nstatus = tsdbNewFSStatus(maxFSet);
if (pfs->nstatus == NULL) {
tsdbFreeFS(pfs);
return NULL;
}
pFs->metaDelta = tdListNew(sizeof(SKVRecord));
if (pFs->metaDelta == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeFS(pFs);
return NULL;
}
return NULL;
return pfs;
}
void *tsdbFreeFS(STsdbFS *pFs) {
if (pFs) {
pFs->metaDelta = tdListFree(pFs->metaDelta);
pFs->nstatus = tsdbFreeFSStatus(pFs->nstatus);
taosHashCleanup(pFs->metaCache);
pFs->metaCache = NULL;
pFs->cstatus = tsdbFreeFSStatus(pFs->cstatus);
pthread_rwlock_destroy(&(pFs->lock));
void *tsdbFreeFS(STsdbFS *pfs) {
if (pfs) {
pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
taosHashCleanup(pfs->metaCache);
pfs->metaCache = NULL;
pfs->cstatus = tsdbFreeFSStatus(pfs->cstatus);
pthread_rwlock_destroy(&(pfs->lock));
}
return NULL;
}
......@@ -203,428 +223,271 @@ void tsdbCloseFS(STsdbFS *pFs) {
// TODO
}
int tsdbStartTxn(STsdbFS *pFs) {
tsdbResetFSStatus(pFs->nstatus);
tdListEmpty(pFs->metaDelta);
return 0;
}
// Start a new transaction to modify the file system
int tsdbStartTxn(STsdbFS *pfs) {
ASSERT(pfs->intxn == false);
int tsdbEndTxn(STsdbFS *pFs, bool hasError) {
SFSStatus *pTStatus;
pfs->intxn = true;
tsdbResetFSStatus(pfs->nstatus);
if (hasError) {
// TODO
} else {
// TODO 1. Create and open a new file current.t
return 0;
}
// TODO 2. write new status to new file and fysnc and close
int tsdbEndTxn(STsdbFS *pfs) {
ASSERT(FS_IN_TXN(pfs));
SFSStatus *pStatus;
// TODO 3. rename current.t to current
// Write current file system snapshot
if (tsdbUpdateFS(pfs) < 0) {
tsdbEndTxnWithError(pfs);
return -1;
}
// TODO 4. apply change to file
tsdbWLockFS(pFs);
pTStatus = pFs->cstatus;
pFs->cstatus = pFs->nstatus;
pFs->nstatus = pTStatus;
tsdbUnLockFS(pFs);
// Make new
tsdbWLockFS(pfs);
pStatus = pfs->cstatus;
pfs->cstatus = pfs->nstatus;
pfs->nstatus = pStatus;
tsdbUnLockFS(pfs);
// TODO 5: apply meta change to cache
}
// Apply actual change to each file and SDFileSet
tsdbApplyFSChangeOnDisk(pfs);
pfs->intxn = false;
return 0;
}
// ================== SFSIter
void tsdbFSIterInit(STsdbFS *pFs, SFSIter *pIter) {
int tsdbEndTxnWithError(STsdbFS *pfs) {
// TODO
pfs->intxn = false;
return 0;
}
SDFileSet *tsdbFSIterNext(STsdbFS *pFs) {
// TODO
return NULL;
}
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); }
#if 0
int tsdbOpenFS(STsdbRepo *pRepo) {
ASSERT(REPO_FS == NULL);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
STsdbCfg *pCfg = TSDB_CFG(pRepo);
static int tsdbUpdateFS(STsdbFS *pfs) {
ASSERT(FS_IN_TXN(pfs));
SFSHeader fsheader;
void * pBuf = NULL;
void * ptr;
char hbuf[TSDB_FILE_HEAD_SIZE] = "\0";
// Create fs object
REPO_FS(pRepo) = tsdbNewFS(pCfg->keep, pCfg->daysPerFile);
if (REPO_FS(pRepo) == NULL) {
tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno));
int fd = open(TSDB_FS_TEMP_FNAME, O_WRONLY | O_CREAT | O_TRUNC, 0755);
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// Load TSDB file system from disk
if (tsdbOpenFSImpl(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbCloseFS(pRepo);
return -1;
fsheader.version = TSDB_FS_VERSION;
if (pfs->nstatus->pmf == NULL) {
ASSERT(taosArrayGetSize(pfs->nstatus->df) == 0);
fsheader.len = 0;
} else {
fsheader.len = tsdbEncodeFSHeader(NULL, pfs->nstatus) + sizeof(TSCKSUM);
}
return 0;
}
// Encode header part and write
ptr = hbuf;
tsdbEncodeFSHeader(&ptr, &fsheader);
tsdbEncodeFSMeta(&ptr, &(pfs->nstatus->meta));
void tsdbCloseFS(STsdbRepo *pRepo) {
REPO_FS(pRepo) = tsdbFreeFS(REPO_FS(pRepo));
return 0;
}
// Start a new FS transaction
int tsdbFSNewTxn(STsdbRepo *pRepo) {
STsdbFS *pFs = REPO_FS(pRepo);
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
if (tsdbCopySnapshot(pFs->curr, pFs->new) < 0) {
if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
remove(TSDB_FS_TEMP_FNAME);
return -1;
}
pFs->new->version++;
return 0;
}
// End an existing FS transaction
int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError) {
STsdbFS *pFs = REPO_FS(pRepo);
if (hasError) { // roll back files
} else { // apply file change
if (tsdbSaveFSSnapshot(-1, pFs->new) < 0) {
// TODO
// Encode file status and write to file
if (fsheader.len > 0) {
if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) {
close(fd);
remove(TSDB_FS_TEMP_FNAME);
return -1;
}
// rename();
// apply all file changes
}
return 0;
}
ptr = pBuf;
tsdbEncodeFSStatus(&ptr, pfs->nstatus);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len)
int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) {
STsdbFS *pFs = REPO_FS(pRepo);
pFs->new->mf = *pMFile;
return 0;
}
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
SFSStatus *pSnapshot = REPO_FS(pRepo)->new;
SDFileSet * pOldSet;
pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE);
if (pOldSet == NULL) {
if (taosArrayPush(pSnapshot->df, pSet) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) {
terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
remove(TSDB_FS_TEMP_FNAME);
taosTZfree(pBuf);
return -1;
}
} else {
int index = TARRAY_ELEM_IDX(pSnapshot->df, pOldSet);
if (pOldSet->id == pSet->id) {
taosArraySet(pSnapshot->df, index, pSet);
} else if (pOldSet->id > pSet->id) {
if (taosArrayInsert(pSnapshot->df, index, pSet) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
ASSERT(0);
}
}
return 0;
}
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
SFSStatus *pSnapshot = REPO_FS(pRepo)->new;
while (taosArrayGetSize(pSnapshot->df) > 0) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0);
if (pSet->id < mfid) {
taosArrayRemove(pSnapshot->df, 0);
}
// fsync, close and rename
if (fsync(fd) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
remove(TSDB_FS_TEMP_FNAME);
taosTZfree(pBuf);
return -1;
}
}
SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) {
// TODO
}
(void)close(fd);
(void)rename(TSDB_FS_TEMP_FNAME, TSDB_FS_CURRENT_FNAME);
taosTZfree(pBuf);
int tsdbInitFSIter(STsdbRepo *pRepo, SFSIter *pIter) {
// TODO
return 0;
}
SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
// TODO
return NULL;
}
static void tsdbApplyFSChangeOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
int ifrom = 0;
int ito = 0;
size_t sizeFrom, sizeTo;
SDFileSet *pSetFrom;
SDFileSet *pSetTo;
static int tsdbSaveFSSnapshot(int fd, SFSStatus *pSnapshot) {
// TODO
return 0;
}
sizeFrom = taosArrayGetSize(pFrom->df);
sizeTo = taosArrayGetSize(pTo->df);
static int tsdbLoadFSSnapshot(SFSStatus *pSnapshot) {
// TODO
return 0;
}
// Apply meta file change
tsdbApplyMFileChange(pFrom->pmf, pTo->pmf);
static int tsdbOpenFSImpl(STsdbRepo *pRepo) {
char manifest[TSDB_FILENAME_LEN] = "\0";
// TODO: use API here
sprintf(manifest, "%s/manifest", pRepo->rootDir);
if (access(manifest, F_OK) == 0) {
// manifest file exist, just load
// TODO
// Apply SDFileSet change
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
// manifest file not exists, scan all the files and construct
// TODO
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
return 0;
}
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
int tlen = 0;
while (true) {
if ((pSetTo == NULL) && (pSetFrom == NULL)) break;
tlen += taosEncodeVariantI64(buf, pMeta->fsversion);
tlen += taosEncodeVariantI64(buf, pMeta->version);
tlen += taosEncodeVariantI64(buf, pMeta->totalPoints);
tlen += taosEncodeVariantI64(buf, pMeta->totalStorage);
if (pSetTo == NULL || (pSetFrom && pSetFrom->fid < pSetTo->fid)) {
tsdbApplyDFileSetChange(pSetFrom, NULL);
return tlen;
}
ifrom++;
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
} else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) {
// Do nothing
if (pSetFrom) {
ito++;
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
}
} else {
tsdbApplyDFileSetChange(pSetFrom, pSetTo);
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
buf = taosDecodeVariantI64(buf, &(pMeta->fsversion));
buf = taosDecodeVariantI64(buf, &(pMeta->version));
buf = taosDecodeVariantI64(buf, &(pMeta->totalPoints));
buf = taosDecodeVariantI64(buf, &(pMeta->totalStorage));
ifrom++;
if (ifrom >= sizeFrom) {
pSetFrom = NULL;
} else {
pSetFrom = taosArrayGet(pFrom->df, ifrom);
}
return buf;
ito++;
if (ito >= sizeTo) {
pSetTo = NULL;
} else {
pSetTo = taosArrayGet(pTo->df, ito);
}
}
}
}
static int tsdbEncodeFSSnapshot(void **buf, SFSStatus *pSnapshot) {
int tlen = 0;
int64_t size = 0;
// Encode meta file
tlen += tsdbEncodeMFile(buf, &(pSnapshot->mf));
// Encode data files
size = taosArrayGetSize(pSnapshot->df);
tlen += taosEncodeVariantI64(buf, size);
for (size_t index = 0; index < size; index++) {
SDFile *pFile = taosArrayGet(pSnapshot->df, index);
tlen += tsdbEncodeDFInfo(buf, &pFile);
}
// ================== SFSIter
// ASSUMPTIONS: the FS Should be read locked when calling these functions
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction) {
pIter->pfs = pfs;
pIter->direction = direction;
return tlen;
}
size_t size = taosArrayGetSize(pfs->cstatus->df);
static void *tsdbDecodeFSSnapshot(void *buf, SFSStatus *pSnapshot) {
int64_t size = 0;
SDFile df;
pIter->version = pfs->cstatus->meta.version;
// Decode meta file
buf = tsdbDecodeMFile(buf, &(pSnapshot->mf));
if (size == 0) {
pIter->index = -1;
pIter->fid = TSDB_IVLD_FID;
} else {
if (direction == TSDB_FS_ITER_FORWARD) {
pIter->index = 0;
} else {
pIter->index = size - 1;
}
// Decode data files
buf = taosDecodeVariantI64(buf, &size);
for (size_t index = 0; index < size; index++) {
buf = tsdbDecodeDFInfo(buf, &df);
taosArrayPush(pSnapshot->df, (void *)(&df));
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
}
return buf;
}
static SFSStatus *tsdbNewSnapshot(int32_t nfiles) {
SFSStatus *pSnapshot;
pSnapshot = (SFSStatus *)calloc(1, sizeof(pSnapshot));
if (pSnapshot == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
void tsdbFSIterSeek(SFSIter *pIter, int fid) {
STsdbFS *pfs = pIter->pfs;
size_t size = taosArrayGetSize(pfs->cstatus->df);
pSnapshot->df = taosArrayInit(nfiles, sizeof(SDFileSet));
if (pSnapshot->df == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pSnapshot);
return NULL;
int flags;
if (pIter->direction == TSDB_FS_ITER_FORWARD) {
flags = TD_GE;
} else {
flags = TD_LE;
}
return pSnapshot;
}
static SFSStatus *tsdbFreeSnapshot(SFSStatus *pSnapshot) {
if (pSnapshot) {
taosArrayDestroy(pSnapshot->df);
free(pSnapshot);
void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), , flags);
if (ptr == NULL) {
pIter->index = -1;
pIter->fid = TSDB_IVLD_FID;
} else {
pIter->index = TARRAY_ELEM_IDX(pfs->cstatus->df, ptr);
pIter->fid = ((SDFileSet *)ptr)->fid;
}
return NULL;
}
static STsdbFS *tsdbNewFS(int32_t keep, int32_t days) {
STsdbFS *pFs;
int code;
int32_t nfiles;
pFs = (STsdbFS *)calloc(1, sizeof(*pFs));
if (pFs == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
code = pthread_rwlock_init(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
free(pFs);
return NULL;
}
SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
STsdbFS * pfs = pIter->pfs;
SDFileSet *pSet;
nfiles = TSDB_MAX_DFILES(keep, days);
if (((pFs->curr = tsdbNewSnapshot(nfiles)) == NULL) || ((pFs->new = tsdbNewSnapshot(nfiles)) == NULL)) {
tsdbFreeFS(pFs);
if (pIter->index < 0) {
ASSERT(pIter->fid == TSDB_IVLD_FID);
return NULL;
}
return pFs;
}
ASSERT(pIter->fid != TSDB_IVLD_FID);
static STsdbFS *tsdbFreeFS(STsdbFS *pFs) {
if (pFs) {
pFs->new = tsdbFreeSnapshot(pFs->new);
pFs->curr = tsdbFreeSnapshot(pFs->curr);
pthread_rwlock_destroy(&(pFs->lock));
free(pFs);
if (pIter->version != pfs->cstatus->meta.version) {
tsdbFSIterSeek(pIter, pIter->fid);
}
return NULL;
}
static int tsdbCopySnapshot(SFSStatus *src, SFSStatus *dst) {
dst->meta = src->meta;
dst->mf = src->meta;
taosArrayCopy(dst->df, src->df);
return 0;
}
static int tsdbCompFSetId(const void *key1, const void *key2) {
int id = *(int *)key1;
SDFileSet *pSet = (SDFileSet *)key2;
if (id < pSet->id) {
return -1;
} else if (id == pSet->id) {
return 0;
} else {
return 1;
if (pIter->index < 0) {
return NULL;
}
}
static SDFileSet *tsdbSearchDFileSet(SFSStatus *pSnapshot, int fid, int flags) {
void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags);
return (ptr == NULL) ? NULL : ((SDFileSet *)ptr);
}
static int tsdbMakeFSChange(STsdbRepo *pRepo) {
tsdbMakeFSMFileChange(pRepo);
tsdbMakeFSDFileChange(pRepo);
return 0;
}
pSet = (SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index);
ASSERT(pSet->fid == pIter->fid);
static int tsdbMakeFSMFileChange(STsdbRepo *pRepo) {
STsdbFS *pFs = REPO_FS(pRepo);
SMFile * pDstMFile = &(pFs->curr->mf);
SMFile * pSrcMFile = &(pFs->new->mf);
if (tfsIsSameFile(&(pDstMFile->f), &(pSrcMFile->f))) { // the same file
if (pDstMFile->info != pSrcMFile->info) {
if (pDstMFile->info.size > pDstMFile->info.size) {
// Commit succeed, do nothing
} else if (pDstMFile->info.size < pDstMFile->info.size) {
// Commit failed, back
// TODO
} else {
ASSERT(0);
}
if (pIter->direction == TSDB_FS_ITER_FORWARD) {
pIter->index++;
if (pIter->index >= taosArrayGetSize(pfs->cstatus->df)) {
pIter->index = -1;
}
} else {
tfsremove(&(pSrcMFile->f));
pIter->index--;
}
return 0;
}
static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) {
STsdbFS * pFs = REPO_FS(pRepo);
int cidx = 0;
int nidx = 0;
SDFileSet *pCSet = NULL;
SDFileSet *pNSet = NULL;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
if (pIter->index > 0) {
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
} else {
pNSet = NULL;
}
while (true) {
if (pCSet == NULL && pNSet == NULL) break;
if (pCSet == NULL || (pNSet != NULL && pCSet->id > pNSet->id)) {
tsdbRemoveDFileSet(pNSet);
nidx++;
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
} else {
pNSet = NULL;
}
} else if (pNSet == NULL || (pCSet != NULL && pCSet->id < pNSet->id)) {
cidx++;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
} else {
// TODO: apply dfileset change
nidx++;
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
} else {
pNSet = NULL;
}
cidx++;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
}
pIter->fid = TSDB_IVLD_FID;
}
return 0;
}
#endif
\ No newline at end of file
return pSet;
}
\ No newline at end of file
......@@ -58,6 +58,48 @@ void *tsdbDecodeSMFile(void *buf, SMFile *pMFile) {
return buf;
}
int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
ASSERT(from != NULL || to != NULL);
if (from != NULL) {
if (to == NULL) {
tsdbRemoveMFile(from);
} else {
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
if (from->info.size > to->info.size) {
tsdbRollbackMFile(to);
}
} else {
tsdbRemoveMFile(from);
}
}
}
return 0;
}
static int tsdbRollBackMFile(const SMFile *pMFile) {
SMFile mf = *pMFile;
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
return -1;
}
if (taosFtruncate(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(&mf);
return -1;
}
if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbCloseMFile(&mf);
return -1;
}
tsdbCloseMFile(&mf);
return 0;
}
int tsdbCreateMFile(SMFile *pMFile) {
ASSERT(pMFile->info.size == 0 && pMFile->info.magic == TSDB_FILE_INIT_MAGIC);
......@@ -220,6 +262,48 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
return buf;
}
static int tsdbApplyDFileChange(SDFile *from, SDFile *to) {
ASSERT(from != NULL || to != NULL);
if (from != NULL) {
if (to == NULL) {
tsdbRemoveDFile(from);
} else {
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
if (from->info.size > to->info.size) {
tsdbRollbackDFile(to);
}
} else {
tsdbRemoveDFile(from);
}
}
}
return 0;
}
static int tsdbRollBackDFile(const SDFile *pDFile) {
SDFile df = *pDFile;
if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
return -1;
}
if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df);
return -1;
}
if (tsdbUpdateDFileHeader(&df) < 0) {
tsdbCloseDFile(&df);
return -1;
}
tsdbCloseDFile(&df);
return 0;
}
// ============== Operations on SDFileSet
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) {
pSet->fid = fid;
......@@ -254,6 +338,16 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
return buf;
}
int tsdbApplyDFileSetChange(const SDFileSet *from, const SDFileSet *to) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbApplyDFileChange(TSDB_DFILE_IN_SET(from, ftype), TSDB_DFILE_IN_SET(to, ftype)) < 0) {
return -1;
}
}
return 0;
}
int tsdbCreateDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册