提交 1747e8ff 编写于 作者: H Hongze Cheng

refactor more code

上级 61cd5a29
......@@ -272,47 +272,47 @@ typedef struct {
SBlockCol cols[];
} SBlockData;
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
TSKEY minKey;
TSKEY maxKey;
SFileGroup fGroup;
SFile nHeadF;
SFile nLastF;
} SHelperFile;
typedef struct {
uint64_t uid;
int32_t tid;
} SHelperTable;
typedef struct {
SBlockIdx* pIdxArray;
int numOfIdx;
int curIdx;
} SIdxH;
typedef struct {
tsdb_rw_helper_t type;
STsdbRepo* pRepo;
int8_t state;
// For file set usage
SHelperFile files;
SIdxH idxH;
SBlockIdx curCompIdx;
void* pWIdx;
// For table set usage
SHelperTable tableInfo;
SBlockInfo* pCompInfo;
bool hasOldLastBlock;
// For block set usage
SBlockData* pCompData;
SDataCols* pDataCols[2];
void* pBuffer; // Buffer to hold the whole data block
void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper;
// typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
// typedef struct {
// TSKEY minKey;
// TSKEY maxKey;
// SFileGroup fGroup;
// SFile nHeadF;
// SFile nLastF;
// } SHelperFile;
// typedef struct {
// uint64_t uid;
// int32_t tid;
// } SHelperTable;
// typedef struct {
// SBlockIdx* pIdxArray;
// int numOfIdx;
// int curIdx;
// } SIdxH;
// typedef struct {
// tsdb_rw_helper_t type;
// STsdbRepo* pRepo;
// int8_t state;
// // For file set usage
// SHelperFile files;
// SIdxH idxH;
// SBlockIdx curCompIdx;
// void* pWIdx;
// // For table set usage
// SHelperTable tableInfo;
// SBlockInfo* pCompInfo;
// bool hasOldLastBlock;
// // For block set usage
// SBlockData* pCompData;
// SDataCols* pDataCols[2];
// void* pBuffer; // Buffer to hold the whole data block
// void* compBuffer; // Buffer for temperary compress/decompress purpose
// } SRWHelper;
// ------------------ tsdbScan.c
typedef struct {
......@@ -324,6 +324,25 @@ typedef struct {
FILE* tLogStream;
} STsdbScanHandle;
// ------------------ tsdbReadUtil.c
typedef struct {
STsdbRepo* pRepo;
SFileGroup fGroup;
TSKEY minKey;
TSKEY maxKey;
SBlockIdx* pBlockIdx;
int nBlockIdx;
SBlockIdx* pCurBlockIdx;
STable* pTable;
SBlockInfo* pBlockInfo;
SDataCols* pDataCols[2];
void* pBuf;
void* pCBuf;
} SReadHandle;
#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)]))
#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
// Operations
// ------------------ tsdbMeta.c
#define TSDB_INIT_NTABLES 1024
......@@ -486,60 +505,60 @@ void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
#define TSDB_MAX_SUBBLOCKS 8
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
#define helperType(h) (h)->type
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define helperFileId(h) ((h)->files.fGroup.fileId)
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
#define helperNewHeadF(h) (&((h)->files.nHeadF))
#define helperNewLastF(h) (&((h)->files.nLastF))
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds,
int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo);
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
return 1;
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
return 0;
} else {
return -1;
}
}
// #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
// #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
// #define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
// #define TSDB_HELPER_TABLE_SET 0x4 // Table is set
// #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
// #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
// #define helperSetState(h, s) (((h)->state) |= (s))
// #define helperClearState(h, s) ((h)->state &= (~(s)))
// #define helperHasState(h, s) ((((h)->state) & (s)) == (s))
// #define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
// #define TSDB_MAX_SUBBLOCKS 8
// #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
// #define helperType(h) (h)->type
// #define helperRepo(h) (h)->pRepo
// #define helperState(h) (h)->state
// #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
// #define helperFileId(h) ((h)->files.fGroup.fileId)
// #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
// #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
// #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
// #define helperNewHeadF(h) (&((h)->files.nHeadF))
// #define helperNewLastF(h) (&((h)->files.nLastF))
// int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
// int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
// void tsdbDestroyHelper(SRWHelper* pHelper);
// void tsdbResetHelper(SRWHelper* pHelper);
// int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
// int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
// int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
// int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
// int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
// int tsdbWriteCompInfo(SRWHelper* pHelper);
// int tsdbWriteCompIdx(SRWHelper* pHelper);
// int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
// int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx);
// int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
// int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo);
// int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
// int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target);
// void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
// int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds,
// int numOfColIds);
// int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo);
// static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
// if (*(TSKEY*)key1 > *(TSKEY*)key2) {
// return 1;
// } else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
// return 0;
// } else {
// return -1;
// }
// }
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
......
......@@ -26,7 +26,10 @@
typedef struct {
int maxIters;
SCommitIter *pIters;
SRWHelper whelper;
SFileGroup * pFGroup;
SReadHandle *pReadH;
SBlockIdx * pBlockIdx;
SBlockInfo * pBlockInfo;
SDataCols * pDataCols;
} STSCommitHandle;
......
......@@ -21,24 +21,6 @@
#include "tscompression.h"
#include "tsdbMain.h"
typedef struct {
STsdbRepo * pRepo;
SFileGroup fGroup;
TSKEY minKey;
TSKEY maxKey;
SBlockIdx * pBlockIdx;
int nBlockIdx;
SBlockIdx * pCurBlockIdx;
STable * pTable;
SBlockInfo *pBlockInfo;
SDataCols * pDataCols[2];
void * pBuf;
void * pCBuf;
} SReadHandle;
#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)]))
#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
int tsdbInitReadHandle(SReadHandle *pReadH, STsdbRepo *pRepo) {
pReadH->pRepo = pRepo;
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册