提交 b6d00ea3 编写于 作者: H hzcheng

add more code

上级 83671062
......@@ -47,7 +47,7 @@ typedef struct {
typedef struct {
STableId tableId;
int32_t sversion; // data schema version
int32_t numOfRows; // number of rows data
int32_t len; // message length
char data[];
} SSubmitBlock;
......@@ -198,13 +198,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
/**
* Insert data to a table in a repository
* @param pRepo the TSDB repository handle
* @param tid the table ID to insert to
* @param pData the data to insert (will give a more specific description)
* @param error the error number to set when failure occurs
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(tsdb_repo_t *pRepo, STableId tid, char *pData);
int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
// -- FOR QUERY TIME SERIES DATA
......
......@@ -44,7 +44,7 @@ typedef struct STSDBCache {
int32_t numOfBlocks;
STSDBCacheBlock *cacheList;
void * current;
} SCacheHandle;
} STsdbCache;
// ---- Operation on STSDBCacheBlock
#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData)
......@@ -53,8 +53,9 @@ typedef struct STSDBCache {
#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next)
#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev)
SCacheHandle *tsdbCreateCache(int32_t numOfBlocks);
int32_t tsdbFreeCache(SCacheHandle *pHandle);
STsdbCache *tsdbCreateCache(int32_t numOfBlocks);
int32_t tsdbFreeCache(STsdbCache *pCache);
void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes);
#ifdef __cplusplus
}
......
......@@ -113,8 +113,7 @@ STsdbMeta *tsdbOpenMeta(char *tsdbDir);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, SDataRows rows);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
#ifdef __cplusplus
}
......
......@@ -3,8 +3,8 @@
#include "tsdbCache.h"
SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) {
SCacheHandle *pCacheHandle = (SCacheHandle *)malloc(sizeof(SCacheHandle));
STsdbCache *tsdbCreateCache(int32_t numOfBlocks) {
STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache));
if (pCacheHandle == NULL) {
// TODO : deal with the error
return NULL;
......@@ -14,4 +14,9 @@ SCacheHandle *tsdbCreateCache(int32_t numOfBlocks) {
}
int32_t tsdbFreeCache(SCacheHandle *pHandle) { return 0; }
\ No newline at end of file
int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; }
void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) {
// TODO: implement here
return NULL;
}
\ No newline at end of file
......@@ -16,6 +16,7 @@
#include "tsdbCache.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
#include "tutil.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
......@@ -51,7 +52,7 @@ typedef struct _tsdb_repo {
STsdbMeta *tsdbMeta;
// The cache Handle
SCacheHandle *tsdbCache;
STsdbCache *tsdbCache;
// Disk tier handle for multi-tier storage
void *diskTier;
......@@ -73,6 +74,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
......@@ -99,7 +101,6 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
if (pCfg != NULL) free(pCfg);
}
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter) {
if (rootDir == NULL) return NULL;
......@@ -257,10 +258,17 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
return NULL;
}
int32_t tsdbInsertData(tsdb_repo_t *repo, STableId tableId, char *pData) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
// TODO: need to return the number of data inserted
int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitBlock *pBlock = pMsg->data;
tsdbInsertDataImpl(pRepo->tsdbMeta, tableId, pData);
for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
return -1;
}
pBlock = ((char *)pBlock) + sizeof(SSubmitBlock) + pBlock->len;
}
return 0;
}
......@@ -392,5 +400,30 @@ static int tsdbOpenMetaFile(char *tsdbDir) {
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) {
// TODO: read tsdb configuration from file
// recover tsdb meta
return 0;
}
static FORCE_INLINE int32_t tdInsertRowToTable(SDataRow row, STable *pTable) { return 0; }
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlock *pBlock) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
if (pTable == NULL) {
return -1;
}
SDataRows rows = pBlock->data;
SDataRowsIter rDataIter, *pIter;
tdInitSDataRowsIter(rows, pIter);
while (!tdRdataIterEnd(pIter)) {
if (tdInsertRowToTable(pIter->row, pTable) < 0) {
// TODO: deal with the error here
}
tdRdataIterNext(pIter);
}
return 0;
}
\ No newline at end of file
......@@ -6,6 +6,7 @@
#include "taosdef.h"
#include "tsdbMeta.h"
#include "hash.h"
#include "tsdbCache.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
......@@ -137,26 +138,20 @@ STsdbMeta *tsdbOpenMeta(char *tsdbDir) {
return pMeta;
}
int32_t tsdbInsertDataImpl(STsdbMeta *pMeta, STableId tableId, SDataRows rows) {
/**
* Check if a table is valid to insert.
* @return NULL for invalid and the pointer to the table if valid
*/
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
if (pTable == NULL) {
return -1;
return NULL;
}
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return -1;
if (pTable->tableId.tid != tableId.tid) return -1;
// Loop to write each row
SDataRowsIter sdataIter;
tdInitSDataRowsIter(rows, &sdataIter);
while (!tdRdataIterEnd(&sdataIter)) {
// Insert the row to it
tsdbInsertRowToTable(pTable, sdataIter.row);
tdRdataIterNext(&sdataIter);
}
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) return NULL;
if (pTable->tableId.tid != tableId.tid) return NULL;
return 0;
return pTable;
}
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
......@@ -258,6 +253,14 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
}
static int tsdbInsertRowToTable(STable *pTable, SDataRow row) {
// TODO
int32_t headSize;
int32_t level;
tSkipListRandNodeInfo(pTable->content.pIndex, &level, &headSize);
// SSkipListNode *pNode = tsdbAllocFromCache(p);
// if (pNode == NULL) {
// return -1;
// }
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册