diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 70cd19d55ab3a11bddb45d638debe3329e3bd477..3ebd892fe79f7397e415627e08519b9429c015e8 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -200,18 +200,6 @@ typedef struct { typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; -typedef struct { - tsdb_rw_helper_t type; // helper type - - int maxTables; - int maxRowSize; - int maxRows; - int maxCols; - int minRowsPerFileBlock; - int maxRowsPerFileBlock; - int8_t compress; -} SHelperCfg; - typedef struct { int fid; TSKEY minKey; @@ -232,26 +220,22 @@ typedef struct { } SHelperTable; typedef struct { - // Global configuration - SHelperCfg config; - - int8_t state; + tsdb_rw_helper_t type; + STsdbRepo* pRepo; + int8_t state; // For file set usage SHelperFile files; - SCompIdx * pCompIdx; - + SCompIdx* pCompIdx; // For table set usage SHelperTable tableInfo; - SCompInfo * pCompInfo; + SCompInfo* pCompInfo; bool hasOldLastBlock; - // For block set usage - SCompData *pCompData; - SDataCols *pDataCols[2]; - - void *pBuffer; // Buffer to hold the whole data block - void *compBuffer; // Buffer for temperary compress/decompress purpose + SCompData* pCompData[TSDB_MAX_SUBBLOCKS]; + SDataCols* pDataCols[2]; + void* pBuffer; // Buffer to hold the whole data block + void* compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; // ------------------ tsdbMain.c @@ -323,9 +307,19 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); void tsdbFreeFileH(STsdbFileH* pFileH); // ------------------ tsdbRWHelper.c +#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state +#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set +#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded +#define TSDB_HELPER_TABLE_SET 0x4 // Table is set +#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded +#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded #define TSDB_MAX_SUBBLOCKS 8 #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) +#define helperType(h) (h)->type +#define helperRepo(h) (h)->pRepo +#define helperState(h) (h)->state + // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId #define IS_REPO_LOCKED(r) (r)->repoLocked @@ -337,29 +331,7 @@ void* tsdbCommitData(void* arg); #if 0 - -// TSDB repository definition - -typedef struct { - int32_t totalLen; - int32_t len; - SDataRow row; -} SSubmitBlkIter; - -// SSubmitMsg Iterator -typedef struct { - int32_t totalLen; - int32_t len; - SSubmitBlk *pBlock; -} SSubmitMsgIter; - // --------- Helper state -#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state -#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set -#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded -#define TSDB_HELPER_TABLE_SET 0x4 // Table is set -#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded -#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded #define TSDB_HELPER_TYPE(h) ((h)->config.type) @@ -398,11 +370,6 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); void tsdbAdjustCacheBlocks(STsdbCache *pCache); int32_t tsdbGetMetaFileName(char *rootDir, char *fname); int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); -int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg); -int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); -int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); -STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version); -STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); int compFGroupKey(const void *key, const void *fgroup); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 298caff0f6bd832ead2c8fd81511636802635f3a..e44f4838b0044d2cb727f7a2449c060371815e8e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -30,6 +30,18 @@ #define TSDB_META_FILE_NAME "meta" #define TSDB_META_FILE_INDEX 10000000 +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SSubmitBlkIter; + +typedef struct { + int32_t totalLen; + int32_t len; + SSubmitBlk *pBlock; +} SSubmitMsgIter; + // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { if (mkdir(rootDir, 0755) < 0) { @@ -50,9 +62,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { return 0; } -int32_t tsdbDropRepo(char *rootDir) { - return tsdbUnsetRepoEnv(rootDir); -} +int32_t tsdbDropRepo(char *rootDir) { return tsdbUnsetRepoEnv(rootDir); } TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { STsdbCfg config = {0}; @@ -211,7 +221,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ return magic; } - void tsdbStartStream(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -653,8 +662,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY } // Check schema version - int32_t tversion = pBlock->sversion; - STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable); + int32_t tversion = pBlock->sversion; + STSchema *pSchema = tsdbGetTableSchema(pMeta, pTable); ASSERT(pSchema != NULL); int16_t nversion = schemaVersion(pSchema); if (tversion > nversion) { @@ -688,7 +697,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; return -1; } - } + } SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; @@ -753,7 +762,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { return row; } - static int tsdbRestoreInfo(STsdbRepo *pRepo) { STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 78ce9be30aaaf0589bda431c067060d686ae705b..75bfe0d7ce7647fdecf2ab268e65cbf4fe905ed4 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -32,6 +32,16 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); +static void * tsdbCommitData(void *arg); +static void tsdbEndCommit(STsdbRepo *pRepo); +static TSKEY tsdbNextIterKey(SCommitIter *pIter); +static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); +static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); +static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo); +static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables); +static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, + int maxRowsToRead, SDataCols *pCols); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { @@ -425,8 +435,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK return 0; } -static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, - TSKEY *maxKey) { +static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } @@ -439,7 +448,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe SFileGroup *pGroup = NULL; TSKEY minKey = 0, maxKey = 0; - tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); // Check if there are data to commit to this file int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index b09154472262c7b3c2ea868350aedb48967c9f65..83d9b99f55ec843258df68e0a69ecc5ec1b19795 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -1082,21 +1082,11 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) { } static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) { - if (pHelper == NULL || pRepo == NULL) return -1; - memset((void *)pHelper, 0, sizeof(*pHelper)); - // Init global configuration - pHelper->config.type = type; - pHelper->config.maxTables = pRepo->config.maxTables; - pHelper->config.maxRowSize = pRepo->tsdbMeta->maxRowBytes; - pHelper->config.maxRows = pRepo->config.maxRowsPerFileBlock; - pHelper->config.maxCols = pRepo->tsdbMeta->maxCols; - pHelper->config.minRowsPerFileBlock = pRepo->config.minRowsPerFileBlock; - pHelper->config.maxRowsPerFileBlock = pRepo->config.maxRowsPerFileBlock; - pHelper->config.compress = pRepo->config.compression; - - pHelper->state = TSDB_HELPER_CLEAR_STATE; + helperType(pHelper) = type; + helperRepo(pHelper) = pRepo; + helperState(pHelper) = TSDB_HELPER_CLEAR_STATE; // Init file part if (tsdbInitHelperFile(pHelper) < 0) goto _err;