From b6d00ea311b37047c344859ca9e8f8d38ee18484 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Thu, 5 Mar 2020 03:14:13 +0000 Subject: [PATCH] add more code --- src/vnode/tsdb/inc/tsdb.h | 6 ++--- src/vnode/tsdb/inc/tsdbCache.h | 7 +++--- src/vnode/tsdb/inc/tsdbMeta.h | 3 +-- src/vnode/tsdb/src/tsdbCache.c | 11 ++++++--- src/vnode/tsdb/src/tsdbMain.c | 43 ++++++++++++++++++++++++++++++---- src/vnode/tsdb/src/tsdbMeta.c | 35 ++++++++++++++------------- 6 files changed, 72 insertions(+), 33 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 53644e4b43..0a9f4561ac 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -47,7 +47,7 @@ typedef struct { typedef struct { STableId tableId; int32_t sversion; // data schema version - int32_t numOfRows; // number of rows data + int32_t len; // message length char data[]; } SSubmitBlock; @@ -198,13 +198,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid); /** * Insert data to a table in a repository * @param pRepo the TSDB repository handle - * @param tid the table ID to insert to * @param pData the data to insert (will give a more specific description) - * @param error the error number to set when failure occurs * * @return the number of points inserted, -1 for failure and the error number is set */ -int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData); +int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg); // -- FOR QUERY TIME SERIES DATA diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 31d8221723..7f018d2b0b 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -44,7 +44,7 @@ typedef struct STSDBCache { int32_t numOfBlocks; STSDBCacheBlock *cacheList; void * current; -} SCacheHandle; +} STsdbCache; // ---- Operation on STSDBCacheBlock #define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) @@ -53,8 +53,9 @@ typedef struct STSDBCache { #define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next) #define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev) -SCacheHandle *tsdbCreateCache(int32_t numOfBlocks); -int32_t tsdbFreeCache(SCacheHandle *pHandle); +STsdbCache *tsdbCreateCache(int32_t numOfBlocks); +int32_t tsdbFreeCache(STsdbCache *pCache); +void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index 36c2dbd6f5..bedcf17d7d 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -113,8 +113,7 @@ STsdbMeta *tsdbOpenMeta(char *tsdbDir); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); - -int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, SDataRows rows); +STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 25b649b73d..ef6af018c8 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -3,8 +3,8 @@ #include "tsdbCache.h" -SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) { - SCacheHandle *pCacheHandle = (SCacheHandle *)malloc(sizeof(SCacheHandle)); +STsdbCache *tsdbCreateCache(int32_t numOfBlocks) { + STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache)); if (pCacheHandle == NULL) { // TODO : deal with the error return NULL; @@ -14,4 +14,9 @@ SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) { } -int32_t tsdbFreeCache(SCacheHandle *pHandle) { return 0; } \ No newline at end of file +int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } + +void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { + // TODO: implement here + return NULL; +} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 254bd4b016..aabae2b14d 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -16,6 +16,7 @@ #include "tsdbCache.h" #include "tsdbFile.h" #include "tsdbMeta.h" +#include "tutil.h" #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) @@ -51,7 +52,7 @@ typedef struct _tsdb_repo { STsdbMeta *tsdbMeta; // The cache Handle - SCacheHandle *tsdbCache; + STsdbCache *tsdbCache; // Disk tier handle for multi-tier storage void *diskTier; @@ -73,6 +74,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg); +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -99,7 +101,6 @@ void tsdbFreeCfg(STsdbCfg *pCfg) { if (pCfg != NULL) free(pCfg); } - tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) { if (rootDir == NULL) return NULL; @@ -257,10 +258,17 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) { return NULL; } -int32_t tsdbInsertData(tsdb_repo_t *repo, STableId tableId, char *pData) { - STsdbRepo *pRepo = (STsdbRepo *)repo; +// TODO: need to return the number of data inserted +int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { + STsdbRepo * pRepo = (STsdbRepo *)repo; + SSubmitBlock *pBlock = pMsg->data; - tsdbInsertDataImpl(pRepo->tsdbMeta, tableId, pData); + for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message + if (tsdbInsertDataToTable(repo, pBlock) < 0) { + return -1; + } + pBlock = ((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len; + } return 0; } @@ -392,5 +400,30 @@ static int tsdbOpenMetaFile(char *tsdbDir) { static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) { // TODO: read tsdb configuration from file // recover tsdb meta + return 0; +} + +static FORCE_INLINE int32_t tdInsertRowToTable(SDataRow row, STable *pTable) { return 0; } + +static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) { + STsdbRepo *pRepo = (STsdbRepo *)repo; + + STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); + if (pTable == NULL) { + return -1; + } + + SDataRows rows = pBlock->data; + SDataRowsIter rDataIter, *pIter; + + tdInitSDataRowsIter(rows, pIter); + while (!tdRdataIterEnd(pIter)) { + if (tdInsertRowToTable(pIter->row, pTable) < 0) { + // TODO: deal with the error here + } + + tdRdataIterNext(pIter); + } + return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 46bea01fbe..7a5d7b9f62 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -6,6 +6,7 @@ #include "taosdef.h" #include "tsdbMeta.h" #include "hash.h" +#include "tsdbCache.h" #define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here @@ -137,26 +138,20 @@ STsdbMeta *tsdbOpenMeta(char *tsdbDir) { return pMeta; } -int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, SDataRows rows) { +/** + * Check if a table is valid to insert. + * @return NULL for invalid and the pointer to the table if valid + */ +STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); if (pTable == NULL) { - return -1; + return NULL; } - if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return -1; - if (pTable->tableId.tid != tableId.tid) return -1; - - // Loop to write each row - SDataRowsIter sdataIter; - tdInitSDataRowsIter(rows, &sdataIter); - while (!tdRdataIterEnd(&sdataIter)) { - // Insert the row to it - tsdbInsertRowToTable(pTable, sdataIter.row); - - tdRdataIterNext(&sdataIter); - } + if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return NULL; + if (pTable->tableId.tid != tableId.tid) return NULL; - return 0; + return pTable; } int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { @@ -258,6 +253,14 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { } static int tsdbInsertRowToTable(STable *pTable, SDataRow row) { - // TODO + int32_t headSize; + int32_t level; + tSkipListRandNodeInfo(pTable->content.pIndex, &level, &headSize); + + // SSkipListNode *pNode = tsdbAllocFromCache(p); + // if (pNode == NULL) { + // return -1; + // } + return 0; } \ No newline at end of file -- GitLab