diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 590b56e0f6337ba4388471a0b0c8f5b1059a5eee..cb3f7eb22ce29810f2c3440ee0d13245e2593280 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -272,47 +272,47 @@ typedef struct { SBlockCol cols[]; } SBlockData; -typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; - -typedef struct { - TSKEY minKey; - TSKEY maxKey; - SFileGroup fGroup; - SFile nHeadF; - SFile nLastF; -} SHelperFile; - -typedef struct { - uint64_t uid; - int32_t tid; -} SHelperTable; - -typedef struct { - SBlockIdx* pIdxArray; - int numOfIdx; - int curIdx; -} SIdxH; - -typedef struct { - tsdb_rw_helper_t type; - - STsdbRepo* pRepo; - int8_t state; - // For file set usage - SHelperFile files; - SIdxH idxH; - SBlockIdx curCompIdx; - void* pWIdx; - // For table set usage - SHelperTable tableInfo; - SBlockInfo* pCompInfo; - bool hasOldLastBlock; - // For block set usage - SBlockData* pCompData; - SDataCols* pDataCols[2]; - void* pBuffer; // Buffer to hold the whole data block - void* compBuffer; // Buffer for temperary compress/decompress purpose -} SRWHelper; +// typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; + +// typedef struct { +// TSKEY minKey; +// TSKEY maxKey; +// SFileGroup fGroup; +// SFile nHeadF; +// SFile nLastF; +// } SHelperFile; + +// typedef struct { +// uint64_t uid; +// int32_t tid; +// } SHelperTable; + +// typedef struct { +// SBlockIdx* pIdxArray; +// int numOfIdx; +// int curIdx; +// } SIdxH; + +// typedef struct { +// tsdb_rw_helper_t type; + +// STsdbRepo* pRepo; +// int8_t state; +// // For file set usage +// SHelperFile files; +// SIdxH idxH; +// SBlockIdx curCompIdx; +// void* pWIdx; +// // For table set usage +// SHelperTable tableInfo; +// SBlockInfo* pCompInfo; +// bool hasOldLastBlock; +// // For block set usage +// SBlockData* pCompData; +// SDataCols* pDataCols[2]; +// void* pBuffer; // Buffer to hold the whole data block +// void* compBuffer; // Buffer for temperary compress/decompress purpose +// } SRWHelper; // ------------------ tsdbScan.c typedef struct { @@ -324,6 +324,25 @@ typedef struct { FILE* tLogStream; } STsdbScanHandle; +// ------------------ tsdbReadUtil.c +typedef struct { + STsdbRepo* pRepo; + SFileGroup fGroup; + TSKEY minKey; + TSKEY maxKey; + SBlockIdx* pBlockIdx; + int nBlockIdx; + SBlockIdx* pCurBlockIdx; + STable* pTable; + SBlockInfo* pBlockInfo; + SDataCols* pDataCols[2]; + void* pBuf; + void* pCBuf; +} SReadHandle; + +#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)])) +#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) + // Operations // ------------------ tsdbMeta.c #define TSDB_INIT_NTABLES 1024 @@ -486,60 +505,60 @@ void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // ------------------ tsdbRWHelper.c -#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state -#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set -#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded -#define TSDB_HELPER_TABLE_SET 0x4 // Table is set -#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded -#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded -#define helperSetState(h, s) (((h)->state) |= (s)) -#define helperClearState(h, s) ((h)->state &= (~(s))) -#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) -#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) -#define TSDB_MAX_SUBBLOCKS 8 -#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) -#define helperType(h) (h)->type -#define helperRepo(h) (h)->pRepo -#define helperState(h) (h)->state -#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) -#define helperFileId(h) ((h)->files.fGroup.fileId) -#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) -#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) -#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) -#define helperNewHeadF(h) (&((h)->files.nHeadF)) -#define helperNewLastF(h) (&((h)->files.nLastF)) - -int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -void tsdbDestroyHelper(SRWHelper* pHelper); -void tsdbResetHelper(SRWHelper* pHelper); -int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); -int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup); -int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); -int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); -int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); -int tsdbWriteCompInfo(SRWHelper* pHelper); -int tsdbWriteCompIdx(SRWHelper* pHelper); -int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); -int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx); -int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); -int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo); -int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); -int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target); -void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds, - int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo); - -static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { - if (*(TSKEY*)key1 > *(TSKEY*)key2) { - return 1; - } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { - return 0; - } else { - return -1; - } -} +// #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state +// #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set +// #define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded +// #define TSDB_HELPER_TABLE_SET 0x4 // Table is set +// #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded +// #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded +// #define helperSetState(h, s) (((h)->state) |= (s)) +// #define helperClearState(h, s) ((h)->state &= (~(s))) +// #define helperHasState(h, s) ((((h)->state) & (s)) == (s)) +// #define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) +// #define TSDB_MAX_SUBBLOCKS 8 +// #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) +// #define helperType(h) (h)->type +// #define helperRepo(h) (h)->pRepo +// #define helperState(h) (h)->state +// #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) +// #define helperFileId(h) ((h)->files.fGroup.fileId) +// #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) +// #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) +// #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) +// #define helperNewHeadF(h) (&((h)->files.nHeadF)) +// #define helperNewLastF(h) (&((h)->files.nLastF)) + +// int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +// int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +// void tsdbDestroyHelper(SRWHelper* pHelper); +// void tsdbResetHelper(SRWHelper* pHelper); +// int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); +// int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup); +// int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); +// int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); +// int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); +// int tsdbWriteCompInfo(SRWHelper* pHelper); +// int tsdbWriteCompIdx(SRWHelper* pHelper); +// int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); +// int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx); +// int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); +// int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo); +// int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); +// int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target); +// void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); +// int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds, +// int numOfColIds); +// int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo); + +// static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { +// if (*(TSKEY*)key1 > *(TSKEY*)key2) { +// return 1; +// } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { +// return 0; +// } else { +// return -1; +// } +// } // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index aaac9247a08b6158d81c1fe3abd9fc499dd18ea0..4e32d50096849a5de04c12a78141fb7cb2f8f5c6 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -26,7 +26,10 @@ typedef struct { int maxIters; SCommitIter *pIters; - SRWHelper whelper; + SFileGroup * pFGroup; + SReadHandle *pReadH; + SBlockIdx * pBlockIdx; + SBlockInfo * pBlockInfo; SDataCols * pDataCols; } STSCommitHandle; diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index acf25d1fc2c9e2653f55ec0996ae666318ec8465..2c6529ab4c7b4d10ac434625a4e9e9d16a45f024 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -21,24 +21,6 @@ #include "tscompression.h" #include "tsdbMain.h" -typedef struct { - STsdbRepo * pRepo; - SFileGroup fGroup; - TSKEY minKey; - TSKEY maxKey; - SBlockIdx * pBlockIdx; - int nBlockIdx; - SBlockIdx * pCurBlockIdx; - STable * pTable; - SBlockInfo *pBlockInfo; - SDataCols * pDataCols[2]; - void * pBuf; - void * pCBuf; -} SReadHandle; - -#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)])) -#define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) - int tsdbInitReadHandle(SReadHandle *pReadH, STsdbRepo *pRepo) { pReadH->pRepo = pRepo; return 0;