提交 278a88a2 编写于 作者: H Hongze Cheng

refact

上级 2764ea1f
......@@ -47,10 +47,6 @@ typedef struct {
SDFileSet* pSet;
} SFSIter;
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f)))
#define TSDB_FILE_FD(tf) ((tf)->fd)
int tsdbOpenFS(STsdbRepo* pRepo);
void tsdbCloseFS(STsdbRepo* pRepo);
int tsdbFSNewTxn(STsdbRepo* pRepo);
......
......@@ -24,6 +24,13 @@ extern "C" {
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_FD(tf) ((tf)->fd)
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(TSDB_FILE_F(f))
#define TSDB_FILE_OPENED(f) (TSDB_FILE_FD(f) >= 0)
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
typedef enum {
TSDB_FILE_HEAD = 0,
TSDB_FILE_DATA,
......@@ -33,7 +40,16 @@ typedef enum {
TSDB_FILE_MANIFEST
} TSDB_FILE_T;
// For meta file
#define tsdbOpenFile(T, f, flags) tsdbOpen##T(f, flags)
#define tsdbCloseFile(T, f) tsdbClose##T(f)
#define tsdbSeekFile(T, f, offset, whence) tsdbSeek##T(f, offset, whence)
#define tsdbWriteFile(T, f, buf, nbytes) tsdbWrite##T(f, buf, nbytes)
#define tsdbUpdateFileMagic(T, f, pCksum) tsdbUpdate##T##Magic(f, pCksum)
#define tsdbTellFile(T, f) tsdbTell##T(f)
#define tsdbEncodeFile(T, buf, f) tsdbEncode##T(buf, f)
#define tsdbDecodeFile(T, buf, f) tsdbDecode##T(buf, f)
// =============== SMFile
typedef struct {
int64_t size;
int64_t tombSize;
......@@ -48,16 +64,60 @@ typedef struct {
int fd;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
int tsdbOpenMFile(SMFile* pMFile, int flags);
void tsdbCloseMFile(SMFile* pMFile);
int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence);
int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte);
int64_t tsdbTellMFile(SMFile *pMFile);
int tsdbEncodeMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeMFile(void* buf, SMFile* pMFile);
void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
static FORCE_INLINE int tsdbOpenSMFile(SMFile* pMFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pMFile));
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags);
if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static FORCE_INLINE void tsdbCloseSMFile(SMFile* pMFile) {
if (TSDB_FILE_OPENED(pMFile)) {
close(pMFile->fd);
TSDB_FILE_SET_CLOSED(pMFile);
}
}
static FORCE_INLINE int64_t tsdbSeekSMFile(SMFile* pMFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
// For .head/.data/.last file
static FORCE_INLINE int64_t tsdbWriteSMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
static FORCE_INLINE void tsdbUpdateSMFileMagic(SMFile* pMFile, void* pCksum) {
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t*)(pCksum), sizeof(TSCKSUM));
}
static FORCE_INLINE int64_t tsdbTellSMFile(SMFile* pMFile) { return tsdbSeekSMFile(pMFile, 0, SEEK_CUR); }
// =============== SDFile
typedef struct {
uint32_t magic;
uint32_t len;
......@@ -74,36 +134,103 @@ typedef struct {
int fd;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
TSDB_FILE_T ftype);
void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
int tsdbOpenDFile(SDFile* pDFile, int flags);
void tsdbCloseDFile(SDFile* pDFile);
int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte);
int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset);
int64_t tsdbTellDFile(SDFile* pDFile);
int tsdbEncodeDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeDFile(void* buf, SDFile* pDFile);
void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm);
void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
TSDB_FILE_T ftype);
void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
static FORCE_INLINE int tsdbOpenSDFile(SDFile *pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->fd = open(pDFile->f.aname, flags);
if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static FORCE_INLINE void tsdbCloseSDFile(SDFile* pDFile) {
if (TSDB_FILE_OPENED(pDFile)) {
close(pDFile->fd);
TSDB_FILE_SET_CLOSED(pDFile);
}
}
static FORCE_INLINE int64_t tsdbSeekSDFile(SDFile *pDFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeek(pDFile->fd, offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
static FORCE_INLINE int64_t tsdbWriteSDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
static FORCE_INLINE int64_t tsdbAppendSDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite;
*offset = tsdbSeekSDFile(pDFile, 0, SEEK_SET);
if (*offset < 0) return -1;
nwrite = tsdbWriteSDFile(pDFile, buf, nbyte);
if (nwrite < 0) return nwrite;
return nwrite;
}
static FORCE_INLINE int64_t tsdbReadSDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosRead(pDFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
static FORCE_INLINE int64_t tsdbTellSDFile(SDFile *pDFile) { return tsdbSeekSDFile(pDFile, 0, SEEK_CUR); }
static FORCE_INLINE void tsdbUpdateSDFileMagic(SDFile* pDFile, void* pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
}
// =============== SDFileSet
typedef struct {
int fid;
int state;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f))
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet);
void tsdbInitDFileSetWithOld(SDFileSet* pSet, SDFileSet* pOldSet);
int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
void tsdbCloseDFileSet(SDFileSet* pSet);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet);
#ifdef __cplusplus
}
#endif
......
......@@ -16,15 +16,22 @@
#ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_
// TODO: remove the include
#include <errno.h>
#include <inttypes.h>
#include "os.h"
#include "tlog.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tskiplist.h"
#include "tdataformat.h"
#include "tlockfree.h"
#include "tlist.h"
#include "hash.h"
#include "tarray.h"
#include "tfs.h"
#include "tsdb.h"
......
......@@ -13,72 +13,37 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
#include "tsdbint.h"
#define TSDB_FILE_OPENED(f) ((f)->fd >= 0)
#define TSDB_FILE_SET_CLOSED(f) ((f)->fd = -1)
static const char *TSDB_FNAME_SUFFIX[] = {
".head", // TSDB_FILE_HEAD
".data", // TSDB_FILE_DATA
".last", // TSDB_FILE_LAST
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"manifest" // TSDB_FILE_MANIFEST
};
// ============== Operations on SMFile
#define tsdbOpenFile(T, f) tsdbOpenT
// ============== SMFile
void tsdbInitMFile(SMFile *pMFile, int vid, int ver, SMFInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TSDB_FILE_SET_CLOSED(pMFile);
if (pInfo == NULL) {
memset(&(pMFile->info), 0, sizeof(pMFile->info));
pMFile->info.magic = TSDB_FILE_INIT_MAGIC;
} else {
pMFile->info = *pInfo;
}
tfsInitFile(&(pMFile->f), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, NULL /*TODO*/);
return pMFile;
}
int tsdbOpenMFile(SMFile *pMFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pMFile));
pMFile->fd = open(pMFile->f.aname, flags);
if (pMFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tsdbCloseMFile(SMFile *pMFile) {
if (TSDB_FILE_OPENED(pMFile)) {
close(pMFile->fd);
TSDB_FILE_SET_CLOSED(pMFile);
}
}
int64_t tsdbSeekMFile(SMFile *pMFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t loffset = taosLSeek(pMFile->fd, offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
tsdbGetFilename(vid, 0, ver, TSDB_FILE_META, fname);
tfsInitFile(TSDB_FILE_F(pMFile), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname);
}
int64_t tsdbWriteMFile(SMFile *pMFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pMFile));
int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pMFile->info.size += nbyte;
return nwrite;
}
int64_t tsdbTellMFile(SMFile *pMFile) { return tsdbSeekMFile(pMFile, 0, SEEK_CUR); }
int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
int tsdbEncodeSMFile(void **buf, SMFile *pMFile) {
int tlen = 0;
tlen += tsdbEncodeMFInfo(buf, &(pMFile->info));
......@@ -87,7 +52,7 @@ int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
return tlen;
}
void *tsdbDecodeMFile(void *buf, SMFile *pMFile) {
void *tsdbDecodeSMFile(void *buf, SMFile *pMFile) {
buf = tsdbDecodeMFInfo(buf, &(pMFile->info));
buf = tfsDecodeFile(buf, &(pMFile->f));
......@@ -118,13 +83,17 @@ static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
// ============== Operations on SDFile
void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo *pInfo, TSDB_FILE_T ftype) {
char fname[TSDB_FILENAME_LEN];
TSDB_FILE_SET_CLOSED(pDFile);
if (pInfo == NULL) {
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
} else {
pDFile->info = *pInfo;
}
tfsInitFile(&(pDFile->f), level, id, NULL /*TODO*/);
}
......@@ -133,78 +102,7 @@ void tsdbInitDFileWithOld(SDFile *pDFile, SDFile *pOldDFile) {
TSDB_FILE_SET_CLOSED(pDFile);
}
int tsdbOpenDFile(SDFile *pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->fd = open(pDFile->f.aname, flags);
if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tsdbCloseDFile(SDFile *pDFile) {
if (TSDB_FILE_OPENED(pDFile)) {
close(pDFile->fd);
TSDB_FILE_SET_CLOSED(pDFile);
}
}
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeek(pDFile->fd, offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
int64_t tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite;
*offset = tsdbSeekDFile(pDFile, 0, SEEK_SET);
if (*offset < 0) return -1;
nwrite = tsdbWriteDFile(pDFile, buf, nbyte);
if (nwrite < 0) return nwrite;
pDFile->info.size += nbyte;
return nwrite;
}
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosRead(pDFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); }
int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
int tlen = 0;
tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
......@@ -213,17 +111,13 @@ int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
return tlen;
}
void *tsdbDecodeDFile(void *buf, SDFile *pDFile) {
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = tfsDecodeFile(buf, &(pDFile->f));
return buf;
}
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
......@@ -291,4 +185,22 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
int tsdbCopyDFileSet(SDFileSet *pFromSet, SDFileSet *pToSet) {
// return 0;
}
static void tsdbGetFilename(int vid, int fid, int64_t ver, TSDB_FILE_T ftype, char *fname) {
ASSERT(ftype != TSDB_FILE_MAX);
if (ftype < TSDB_FILE_MAX) {
if (ver == 0) {
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
} else {
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRId64, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
}
} else {
if (ver == 0) {
snprintf(fname, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
} else {
snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRId64, vid, TSDB_FNAME_SUFFIX[ftype], ver);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册