提交 b9c38d6e 编写于 作者: H Hongze Cheng

partial work

上级 62f08912
......@@ -57,6 +57,7 @@ typedef struct {
#define TFILE_NAME(pf) ((pf)->aname)
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2);
void tfsSetLevel(TFILE *pf, int level);
void tfsSetID(TFILE *pf, int id);
int tfsopen(TFILE *pf, int flags);
......
......@@ -175,6 +175,13 @@ void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
tfsSetFileAname(pf);
}
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2) {
if (pf1->level != pf2->level) return false;
if (pf1->id != pf2->id) return false;
if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false;
return true;
}
void tfsSetLevel(TFILE *pf, int level) {
pf->level = level;
......
......@@ -3,6 +3,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
list(REMOVE_ITEM SRC "src/tsdbFS.c")
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
......
......@@ -45,10 +45,6 @@ extern int32_t tsdbDebugFlag;
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
// ================= OTHERS
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax)))
......@@ -58,6 +54,8 @@ extern int32_t tsdbDebugFlag;
// Definitions
// ================= tsdbMeta.c
#define TSDB_MAX_TABLE_SCHEMAS 16
typedef struct STable {
STableId tableId;
ETableType type;
......@@ -295,102 +293,211 @@ static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
return dataRowTKey(row);
}
// ================= tsdbFile.c
extern const char* tsdbFileSuffix[];
// ================= tsdbFS.c
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
// minFid <= midFid <= maxFid
enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX };
// For meta file
typedef struct {
int minFid; // >= minFid && < midFid, at level 2
int midFid; // >= midFid && < maxFid, at level 1
int maxFid; // >= maxFid, at level 0
} SFidGroup;
typedef enum {
TSDB_FILE_TYPE_HEAD = 0,
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_STAT,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NDATA,
TSDB_FILE_TYPE_NLAST,
TSDB_FILE_TYPE_NSTAT
} TSDB_FILE_TYPE;
#ifndef TDINTERNAL
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
#else
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
#endif
int64_t size;
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
uint32_t magic;
} SMFInfo;
typedef struct {
SMFInfo info;
TFILE f;
int fd;
} SMFile;
// For .head/.data/.last file
typedef struct {
uint32_t magic;
uint32_t len;
uint32_t totalBlocks;
uint32_t totalSubBlocks;
uint32_t offset;
uint64_t size; // total size of the file
uint64_t tombSize; // unused file size
} STsdbFileInfo;
uint64_t size;
uint64_t tombSize;
} SDFInfo;
typedef struct {
TFILE file;
STsdbFileInfo info;
int fd;
} SFile;
SDFInfo info;
TFILE f;
int fd;
} SDFile;
typedef struct {
int fileId;
int state; // 0 for health, 1 for problem
SFile files[TSDB_FILE_TYPE_MAX];
} SFileGroup;
int id;
int state;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
/* Statistic information of the TSDB file system.
*/
typedef struct {
pthread_rwlock_t fhlock;
int64_t fsversion; // file system version, related to program
int64_t version;
int64_t totalPoints;
int64_t totalStorage;
} STsdbFSMeta;
int maxFGroups;
int nFGroups;
SFileGroup* pFGroup;
} STsdbFileH;
typedef struct {
int64_t version;
STsdbFSMeta meta;
SMFile mf; // meta file
SArray * df; // data file array
} SFSSnapshot;
typedef struct {
int direction;
STsdbFileH* pFileH;
int fileId;
int index;
} SFileGroupIter;
#define TSDB_FILE_NAME(pFile) ((pFile)->file.aname)
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0)
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
int tsdbOpenFileH(STsdbRepo* pRepo);
void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart);
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level);
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
int tsdbOpenFile(SFile* pFile, int oflag);
void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
int tsdbGetFidLevel(int fid, SFidGroup fidg);
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup);
int tsdbUpdateFileHeader(SFile* pFile);
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
pthread_rwlock_t lock;
SFSSnapshot *curr;
SFSSnapshot *new;
} STsdbFS;
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f)))
#define TSDB_FILE_FD(tf) ((tf)->fd)
int tsdbOpenFS(STsdbRepo *pRepo);
void tsdbCloseFS(STsdbRepo *pRepo);
int tsdbFSNewTxn(STsdbRepo *pRepo);
int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError);
int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet);
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid);
int tsdbRemoveDFileSet(SDFileSet *pSet);
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_wrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_unlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
// ================= tsdbFile.c
// extern const char* tsdbFileSuffix[];
// minFid <= midFid <= maxFid
// typedef struct {
// int minFid; // >= minFid && < midFid, at level 2
// int midFid; // >= midFid && < maxFid, at level 1
// int maxFid; // >= maxFid, at level 0
// } SFidGroup;
// typedef enum {
// TSDB_FILE_TYPE_HEAD = 0,
// TSDB_FILE_TYPE_DATA,
// TSDB_FILE_TYPE_LAST,
// TSDB_FILE_TYPE_STAT,
// TSDB_FILE_TYPE_NHEAD,
// TSDB_FILE_TYPE_NDATA,
// TSDB_FILE_TYPE_NLAST,
// TSDB_FILE_TYPE_NSTAT
// } TSDB_FILE_TYPE;
// #ifndef TDINTERNAL
// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
// #else
// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
// #endif
// typedef struct {
// uint32_t magic;
// uint32_t len;
// uint32_t totalBlocks;
// uint32_t totalSubBlocks;
// uint32_t offset;
// uint64_t size; // total size of the file
// uint64_t tombSize; // unused file size
// } STsdbFileInfo;
// typedef struct {
// TFILE file;
// STsdbFileInfo info;
// int fd;
// } SFile;
// typedef struct {
// int fileId;
// int state; // 0 for health, 1 for problem
// SFile files[TSDB_FILE_TYPE_MAX];
// } SFileGroup;
// typedef struct {
// pthread_rwlock_t fhlock;
// int maxFGroups;
// int nFGroups;
// SFileGroup* pFGroup;
// } STsdbFileH;
// typedef struct {
// int direction;
// STsdbFileH* pFileH;
// int fileId;
// int index;
// } SFileGroupIter;
// #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname)
// #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
// #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
// #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
// #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
// #define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0)
// #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
// #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
// STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
// void tsdbFreeFileH(STsdbFileH* pFileH);
// int tsdbOpenFileH(STsdbRepo* pRepo);
// void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart);
// SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level);
// void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
// void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
// SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
// int tsdbOpenFile(SFile* pFile, int oflag);
// void tsdbCloseFile(SFile* pFile);
// int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
// SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
// int tsdbGetFidLevel(int fid, SFidGroup fidg);
// void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup);
// int tsdbUpdateFileHeader(SFile* pFile);
// int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
// void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
// void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
// int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
// void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
// void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup);
// void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
// ================= tsdbMain.c
typedef struct {
......@@ -416,7 +523,7 @@ struct STsdbRepo {
STsdbBufPool* pPool;
SMemTable* mem;
SMemTable* imem;
STsdbFileH* tsdbFileH;
STsdbFS* fs;
sem_t readyToCommit;
pthread_mutex_t mutex;
bool repoLocked;
......
......@@ -147,8 +147,8 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
goto _err;
}
// TODO
// tsdbUpdateMFile(pRepo, NULL)
// TODO: update meta file
tsdbUpdateMFile(pRepo, NULL);
return 0;
......@@ -162,13 +162,17 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
// TODO
if (tsdbFSNewTxn(pRepo) < 0) return -1;
pRepo->code = TSDB_CODE_SUCCESS;
return 0;
}
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (tsdbFSEndTxn(pRepo, eno != TSDB_CODE_SUCCESS) < 0) {
eno = terrno;
}
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include "tsdbMain.h"
#define REPO_FS(r) ((r)->fs)
#define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3)
int tsdbOpenFS(STsdbRepo *pRepo) {
ASSERT(REPO_FS == NULL);
STsdbCfg *pCfg = TSDB_CFG(pRepo);
// Create fs object
REPO_FS(pRepo) = tsdbNewFS(pCfg->keep, pCfg->daysPerFile);
if (REPO_FS(pRepo) == NULL) {
tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// Load TSDB file system from disk
if (tsdbOpenFSImpl(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB FS since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbCloseFS(pRepo);
return -1;
}
return 0;
}
void tsdbCloseFS(STsdbRepo *pRepo) {
REPO_FS(pRepo) = tsdbFreeFS(REPO_FS(pRepo));
return 0;
}
// Start a new FS transaction
int tsdbFSNewTxn(STsdbRepo *pRepo) {
STsdbFS *pFs = REPO_FS(pRepo);
if (tsdbCopySnapshot(pFs->curr, pFs->new) < 0) {
return -1;
}
pFs->new->version++;
return 0;
}
// End an existing FS transaction
int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError) {
STsdbFS *pFs = REPO_FS(pRepo);
if (hasError) { // roll back files
} else { // apply file change
if (tsdbSaveFSSnapshot(-1, pFs->new) < 0) {
// TODO
}
// rename();
// apply all file changes
}
return 0;
}
int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile) {
STsdbFS *pFs = REPO_FS(pRepo);
pFs->new->mf = *pMFile;
return 0;
}
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new;
SDFileSet * pOldSet;
pOldSet = tsdbSearchDFileSet(pSnapshot, pSet->id, TD_GE);
if (pOldSet == NULL) {
if (taosArrayPush(pSnapshot->df, pSet) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
int index = TARRAY_ELEM_IDX(dfArray, ptr);
if (pOldSet->id == pSet->id) {
taosArraySet(pSnapshot->df, index, pSet);
} else if (pOldSet->id > pSet->id) {
if (taosArrayInsert(pSnapshot->df, index, pSet) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
ASSERT(0);
}
}
return 0;
}
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
SFSSnapshot *pSnapshot = REPO_FS(pRepo)->new;
while (taosArrayGetSize(pSnapshot->df) > 0) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pSnapshot->df, 0);
if (pSet->id < mfid) {
taosArrayRemove(pSnapshot->df, 0);
}
}
}
static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) {
// TODO
return 0;
}
static int tsdbLoadFSSnapshot(SFSSnapshot *pSnapshot) {
// TODO
return 0;
}
static int tsdbOpenFSImpl(STsdbRepo *pRepo) {
char manifest[TSDB_FILENAME_LEN] = "\0";
// TODO: use API here
sprintf(manifest, "%s/manifest", pRepo->rootDir);
if (access(manifest, F_OK) == 0) {
// manifest file exist, just load
// TODO
} else {
// manifest file not exists, scan all the files and construct
// TODO
}
return 0;
}
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
static int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
int tlen = 0;
tlen += tsdbEncodeMFInfo(buf, &(pMFile->info));
tlen += tfsEncodeFile(buf, &(pMFile->f));
return tlen;
}
static void *tsdbDecodeMFile(void *buf, SMFile *pMFile) {
buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
buf = tfsDecodeFile(buf, &(pMFile->f));
return buf;
}
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->offset);
tlen += taosEncodeFixedU64(buf, pInfo->size);
tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
return tlen;
}
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
return buf;
}
static int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
int tlen = 0;
tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
tlen += tfsEncodeFile(buf, &(pDFile->f));
return tlen;
}
static void *tsdbDecodeDFile(void *buf, SDFile *pDFile) {
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = tfsDecodeFile(buf, &(pDFile->f));
return buf;
}
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pMeta->fsversion);
tlen += taosEncodeVariantI64(buf, pMeta->version);
tlen += taosEncodeVariantI64(buf, pMeta->totalPoints);
tlen += taosEncodeVariantI64(buf, pMeta->totalStorage);
return tlen;
}
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
buf = taosDecodeVariantI64(buf, &(pMeta->fsversion));
buf = taosDecodeVariantI64(buf, &(pMeta->version));
buf = taosDecodeVariantI64(buf, &(pMeta->totalPoints));
buf = taosDecodeVariantI64(buf, &(pMeta->totalStorage));
return buf;
}
static int tsdbEncodeFSSnapshot(void **buf, SFSSnapshot *pSnapshot) {
int tlen = 0;
int64_t size = 0;
// Encode meta file
tlen += tsdbEncodeMFile(buf, &(pSnapshot->mf));
// Encode data files
size = taosArrayGetSize(pSnapshot->df);
tlen += taosEncodeVariantI64(buf, size);
for (size_t index = 0; index < size; index++) {
SDFile *pFile = taosArrayGet(pSnapshot->df, index);
tlen += tsdbEncodeDFInfo(buf, &pFile);
}
return tlen;
}
static void *tsdbDecodeFSSnapshot(void *buf, SFSSnapshot *pSnapshot) {
int64_t size = 0;
SDFile df;
// Decode meta file
buf = tsdbDecodeMFile(buf, &(pSnapshot->mf));
// Decode data files
buf = taosDecodeVariantI64(buf, &size);
for (size_t index = 0; index < size; index++) {
buf = tsdbDecodeDFInfo(buf, &df);
taosArrayPush(pSnapshot->df, (void *)(&df));
}
return buf;
}
static SFSSnapshot *tsdbNewSnapshot(int32_t nfiles) {
SFSSnapshot *pSnapshot;
pSnapshot = (SFSSnapshot *)calloc(1, sizeof(pSnapshot));
if (pSnapshot == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pSnapshot->df = taosArrayInit(nfiles, sizeof(SDFileSet));
if (pSnapshot->df == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pSnapshot);
return NULL;
}
return pSnapshot;
}
static SFSSnapshot *tsdbFreeSnapshot(SFSSnapshot *pSnapshot) {
if (pSnapshot) {
taosArrayDestroy(pSnapshot->df);
free(pSnapshot);
}
return NULL;
}
static STsdbFS *tsdbNewFS(int32_t keep, int32_t days) {
STsdbFS *pFs;
int code;
int32_t nfiles;
pFs = (STsdbFS *)calloc(1, sizeof(*pFs));
if (pFs == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
code = pthread_rwlock_init(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
free(pFs);
return NULL;
}
nfiles = TSDB_MAX_DFILES(keep, days);
if (((pFs->curr = tsdbNewSnapshot(nfiles)) == NULL) || ((pFs->new = tsdbNewSnapshot(nfiles)) == NULL)) {
tsdbFreeFS(pFs);
return NULL;
}
return pFs;
}
static STsdbFS *tsdbFreeFS(STsdbFS *pFs) {
if (pFs) {
pFs->new = tsdbFreeSnapshot(pFs->new);
pFs->curr = tsdbFreeSnapshot(pFs->curr);
pthread_rwlock_destroy(&(pFs->lock));
free(pFs);
}
return NULL;
}
static int tsdbCopySnapshot(SFSSnapshot *src, SFSSnapshot *dst) {
dst->meta = src->meta;
dst->mf = src->meta;
taosArrayCopy(dst->df, src->df);
return 0;
}
static int tsdbCompFSetId(const void *key1, const void *key2) {
int id = *(int *)key1;
SDFileSet *pSet = (SDFileSet *)key2;
if (id < pSet->id) {
return -1;
} else if (id == pSet->id) {
return 0;
} else {
return 1;
}
}
static SDFileSet *tsdbSearchDFileSet(SFSSnapshot *pSnapshot, int fid, int flags) {
void *ptr = taosArraySearch(pSnapshot->df, (void *)(&fid), tsdbCompFSetId, flags);
return (ptr == NULL) ? NULL : ((SDFileSet *)ptr);
}
static int tsdbMakeFSChange(STsdbRepo *pRepo) {
tsdbMakeFSMFileChange(pRepo);
tsdbMakeFSDFileChange(pRepo);
return 0;
}
static int tsdbMakeFSMFileChange(STsdbRepo *pRepo) {
STsdbFS *pFs = REPO_FS(pRepo);
SMFile * pDstMFile = &(pFs->curr->mf);
SMFile * pSrcMFile = &(pFs->new->mf);
if (tfsIsSameFile(&(pDstMFile->f), &(pSrcMFile->f))) { // the same file
if (pDstMFile->info != pSrcMFile->info) {
if (pDstMFile->info.size > pDstMFile->info.size) {
// Commit succeed, do nothing
} else if (pDstMFile->info.size < pDstMFile->info.size) {
// Commit failed, back
// TODO
} else {
ASSERT(0);
}
}
} else {
tfsremove(&(pSrcMFile->f));
}
return 0;
}
static int tsdbMakeFSDFileChange(STsdbRepo *pRepo) {
STsdbFS * pFs = REPO_FS(pRepo);
int cidx = 0;
int nidx = 0;
SDFileSet *pCSet = NULL;
SDFileSet *pNSet = NULL;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
} else {
pNSet = NULL;
}
while (true) {
if (pCSet == NULL && pNSet == NULL) break;
if (pCSet == NULL || (pNSet != NULL && pCSet->id > pNSet->id)) {
tsdbRemoveDFileSet(pNSet);
nidx++;
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
} else {
pNSet = NULL;
}
} else if (pNSet == NULL || (pCSet != NULL && pCSet->id < pNSet->id)) {
cidx++;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
} else {
// TODO: apply dfileset change
nidx++;
if (nidx < taosArrayGetSize(pFs->new->df)) {
pNSet = taosArrayGet(pFs->new->df, nidx);
} else {
pNSet = NULL;
}
cidx++;
if (cidx < taosArrayGetSize(pFs->curr->df)) {
pCSet = taosArrayGet(pFs->curr->df, cidx);
} else {
pCSet = NULL;
}
}
}
return 0;
}
\ No newline at end of file
......@@ -109,7 +109,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto _err;
}
if (tsdbOpenFileH(pRepo) < 0) {
if (tsdbOpenFS(pRepo) < 0) {
tsdbError("vgId:%d failed to open file handle since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册