提交 209ebe59 编写于 作者: H Hongze Cheng

TD-353

上级 1a139a07
...@@ -200,18 +200,6 @@ typedef struct { ...@@ -200,18 +200,6 @@ typedef struct {
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
tsdb_rw_helper_t type; // helper type
int maxTables;
int maxRowSize;
int maxRows;
int maxCols;
int minRowsPerFileBlock;
int maxRowsPerFileBlock;
int8_t compress;
} SHelperCfg;
typedef struct { typedef struct {
int fid; int fid;
TSKEY minKey; TSKEY minKey;
...@@ -232,26 +220,22 @@ typedef struct { ...@@ -232,26 +220,22 @@ typedef struct {
} SHelperTable; } SHelperTable;
typedef struct { typedef struct {
// Global configuration tsdb_rw_helper_t type;
SHelperCfg config;
int8_t state;
STsdbRepo* pRepo;
int8_t state;
// For file set usage // For file set usage
SHelperFile files; SHelperFile files;
SCompIdx * pCompIdx; SCompIdx* pCompIdx;
// For table set usage // For table set usage
SHelperTable tableInfo; SHelperTable tableInfo;
SCompInfo * pCompInfo; SCompInfo* pCompInfo;
bool hasOldLastBlock; bool hasOldLastBlock;
// For block set usage // For block set usage
SCompData *pCompData; SCompData* pCompData[TSDB_MAX_SUBBLOCKS];
SDataCols *pDataCols[2]; SDataCols* pDataCols[2];
void* pBuffer; // Buffer to hold the whole data block
void *pBuffer; // Buffer to hold the whole data block void* compBuffer; // Buffer for temperary compress/decompress purpose
void *compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
...@@ -323,9 +307,19 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); ...@@ -323,9 +307,19 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH); void tsdbFreeFileH(STsdbFileH* pFileH);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
#define helperType(h) (h)->type
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked #define IS_REPO_LOCKED(r) (r)->repoLocked
...@@ -337,29 +331,7 @@ void* tsdbCommitData(void* arg); ...@@ -337,29 +331,7 @@ void* tsdbCommitData(void* arg);
#if 0 #if 0
// TSDB repository definition
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SSubmitBlkIter;
// SSubmitMsg Iterator
typedef struct {
int32_t totalLen;
int32_t len;
SSubmitBlk *pBlock;
} SSubmitMsgIter;
// --------- Helper state // --------- Helper state
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_HELPER_TYPE(h) ((h)->config.type) #define TSDB_HELPER_TYPE(h) ((h)->config.type)
...@@ -398,11 +370,6 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); ...@@ -398,11 +370,6 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache); void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname); int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg);
int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version);
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
int compFGroupKey(const void *key, const void *fgroup); int compFGroupKey(const void *key, const void *fgroup);
......
...@@ -30,6 +30,18 @@ ...@@ -30,6 +30,18 @@
#define TSDB_META_FILE_NAME "meta" #define TSDB_META_FILE_NAME "meta"
#define TSDB_META_FILE_INDEX 10000000 #define TSDB_META_FILE_INDEX 10000000
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SSubmitBlkIter;
typedef struct {
int32_t totalLen;
int32_t len;
SSubmitBlk *pBlock;
} SSubmitMsgIter;
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
if (mkdir(rootDir, 0755) < 0) { if (mkdir(rootDir, 0755) < 0) {
...@@ -50,9 +62,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { ...@@ -50,9 +62,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
return 0; return 0;
} }
int32_t tsdbDropRepo(char *rootDir) { int32_t tsdbDropRepo(char *rootDir) { return tsdbUnsetRepoEnv(rootDir); }
return tsdbUnsetRepoEnv(rootDir);
}
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
STsdbCfg config = {0}; STsdbCfg config = {0};
...@@ -211,7 +221,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ ...@@ -211,7 +221,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return magic; return magic;
} }
void tsdbStartStream(TSDB_REPO_T *repo) { void tsdbStartStream(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
...@@ -653,8 +662,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -653,8 +662,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
} }
// Check schema version // Check schema version
int32_t tversion = pBlock->sversion; int32_t tversion = pBlock->sversion;
STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable); STSchema *pSchema = tsdbGetTableSchema(pMeta, pTable);
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
int16_t nversion = schemaVersion(pSchema); int16_t nversion = schemaVersion(pSchema);
if (tversion > nversion) { if (tversion > nversion) {
...@@ -688,7 +697,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY ...@@ -688,7 +697,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1; return -1;
} }
} }
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SDataRow row = NULL;
...@@ -753,7 +762,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -753,7 +762,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return row; return row;
} }
static int tsdbRestoreInfo(STsdbRepo *pRepo) { static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
......
...@@ -32,6 +32,16 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); ...@@ -32,6 +32,16 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData); static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data); static char * tsdbGetTsTupleKey(const void *data);
static void * tsdbCommitData(void *arg);
static void tsdbEndCommit(STsdbRepo *pRepo);
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo);
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables);
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey,
int maxRowsToRead, SDataCols *pCols);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
...@@ -425,8 +435,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK ...@@ -425,8 +435,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
return 0; return 0;
} }
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
...@@ -439,7 +448,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -439,7 +448,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SFileGroup *pGroup = NULL; SFileGroup *pGroup = NULL;
TSKEY minKey = 0, maxKey = 0; TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file // Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
......
...@@ -1082,21 +1082,11 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) { ...@@ -1082,21 +1082,11 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
} }
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) { static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
if (pHelper == NULL || pRepo == NULL) return -1;
memset((void *)pHelper, 0, sizeof(*pHelper)); memset((void *)pHelper, 0, sizeof(*pHelper));
// Init global configuration helperType(pHelper) = type;
pHelper->config.type = type; helperRepo(pHelper) = pRepo;
pHelper->config.maxTables = pRepo->config.maxTables; helperState(pHelper) = TSDB_HELPER_CLEAR_STATE;
pHelper->config.maxRowSize = pRepo->tsdbMeta->maxRowBytes;
pHelper->config.maxRows = pRepo->config.maxRowsPerFileBlock;
pHelper->config.maxCols = pRepo->tsdbMeta->maxCols;
pHelper->config.minRowsPerFileBlock = pRepo->config.minRowsPerFileBlock;
pHelper->config.maxRowsPerFileBlock = pRepo->config.maxRowsPerFileBlock;
pHelper->config.compress = pRepo->config.compression;
pHelper->state = TSDB_HELPER_CLEAR_STATE;
// Init file part // Init file part
if (tsdbInitHelperFile(pHelper) < 0) goto _err; if (tsdbInitHelperFile(pHelper) < 0) goto _err;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册