未验证 提交 64200763 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1473 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
......@@ -101,8 +101,6 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
struct STsdbRepo;
// SSubmitMsg Iterator
typedef struct {
int32_t totalLen;
......@@ -328,7 +326,7 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition
*
*/
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDBCACHE_H_)
#define _TD_TSDBCACHE_H_
#include <stdint.h>
#include "taosdef.h"
#include "tlist.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct {
int blockId;
int offset;
int remain;
int padding;
char data[];
} STsdbCacheBlock;
typedef struct {
int64_t index;
SList * memPool;
} STsdbCachePool;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfPoints;
SList * list;
} SCacheMem;
typedef struct {
int maxBytes;
int cacheBlockSize;
int totalCacheBlocks;
STsdbCachePool pool;
STsdbCacheBlock *curBlock;
SCacheMem * mem;
SCacheMem * imem;
tsdb_repo_t * pRepo;
} STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDBCACHE_H_
......@@ -12,20 +12,160 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDB_FILE_H_)
#define _TD_TSDB_FILE_H_
#ifndef _TD_TSDB_MAIN_H_
#define _TD_TSDB_MAIN_H_
#include <stdint.h>
#include "dataformat.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tsdb.h"
#include "tlist.h"
#include "tglobalcfg.h"
#include "tskiplist.h"
#include "tutil.h"
#ifdef __cplusplus
extern "C" {
#endif
// ------------------------------ TSDB META FILE INTERFACES ------------------------------
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int fd; // File descriptor
int nDel; // number of deletions
int tombSize; // deleted size
int64_t size; // Total file size
void * map; // Map from uid ==> position
iterFunc iFunc;
afterFunc aFunc;
void * appH;
} SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
void tsdbCloseMetaFile(SMetaFile *mfh);
// ------------------------------ TSDB META INTERFACES ------------------------------
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfPoints;
void * pData;
} SMemTable;
// ---------- TSDB TABLE DEFINITION
typedef struct STable {
int8_t type;
STableId tableId;
int64_t superUid; // Super table UID
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void * eventHandler; // TODO
void * streamHandler; // TODO
struct STable *next; // TODO: remove the next
} STable;
void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void * tsdbFreeEncode(void *cont);
// ---------- TSDB META HANDLE DEFINITION
typedef struct {
int32_t maxTables; // Max number of tables
int32_t nTables; // Tables created
STable **tables; // table array
STable *superList; // super table list TODO: change it to list container
void *map; // table map of (uid ===> table)
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
} STsdbMeta;
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
#define TSDB_TABLE_NAME(pTable) ((pTable)->tableName)
#define TSDB_TABLE_TYPE(pTable) ((pTable)->type)
#define TSDB_TABLE_SUPER_TABLE_UID(pTable) ((pTable)->stableUid)
#define TSDB_TABLE_IS_SUPER_TABLE(pTable) (TSDB_TABLE_TYPE(pTable) == TSDB_SUPER_TABLE)
#define TSDB_TABLE_TAG_VALUE(pTable) ((pTable)->pTagVal)
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
// ------------------------------ TSDB CACHE INTERFACES ------------------------------
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct {
int blockId;
int offset;
int remain;
int padding;
char data[];
} STsdbCacheBlock;
typedef struct {
int64_t index;
SList * memPool;
} STsdbCachePool;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfPoints;
SList * list;
} SCacheMem;
typedef struct {
int maxBytes;
int cacheBlockSize;
int totalCacheBlocks;
STsdbCachePool pool;
STsdbCacheBlock *curBlock;
SCacheMem * mem;
SCacheMem * imem;
tsdb_repo_t * pRepo;
} STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
// ------------------------------ TSDB FILE INTERFACES ------------------------------
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
......@@ -174,11 +314,41 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
// TODO: need an API to merge all sub-block data into one
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
// TSDB repository definition
typedef struct _tsdb_repo {
char *rootDir;
// TSDB configuration
STsdbCfg config;
// The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta;
// The cache Handle
STsdbCache *tsdbCache;
// The TSDB file handle
STsdbFileH *tsdbFileH;
// Disk tier handle for multi-tier storage
void *diskTier;
pthread_mutex_t mutex;
int commit;
pthread_t commitThread;
// A limiter to monitor the resources used by tsdb
void *limiter;
int8_t state;
} STsdbRepo;
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_FILE_H_
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TSDB_META_H_)
#define _TSDB_META_H_
#include <pthread.h>
#include "tsdb.h"
#include "dataformat.h"
#include "tskiplist.h"
#include "tsdbMetaFile.h"
#ifdef __cplusplus
extern "C" {
#endif
// #include "taosdef.h"
// Initially, there are 4 tables
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfPoints;
void * pData;
} SMemTable;
// ---------- TSDB TABLE DEFINITION
typedef struct STable {
int8_t type;
STableId tableId;
int64_t superUid; // Super table UID
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void * eventHandler; // TODO
void * streamHandler; // TODO
struct STable *next; // TODO: remove the next
} STable;
void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void * tsdbFreeEncode(void *cont);
// ---------- TSDB META HANDLE DEFINITION
typedef struct {
int32_t maxTables; // Max number of tables
int32_t nTables; // Tables created
STable **tables; // table array
STable *superList; // super table list TODO: change it to list container
void *map; // table map of (uid ===> table)
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
} STsdbMeta;
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
#define TSDB_TABLE_NAME(pTable) ((pTable)->tableName)
#define TSDB_TABLE_TYPE(pTable) ((pTable)->type)
#define TSDB_TABLE_SUPER_TABLE_UID(pTable) ((pTable)->stableUid)
#define TSDB_TABLE_IS_SUPER_TABLE(pTable) (TSDB_TABLE_TYPE(pTable) == TSDB_SUPER_TABLE)
#define TSDB_TABLE_TAG_VALUE(pTable) ((pTable)->pTagVal)
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
#ifdef __cplusplus
}
#endif
#endif // _TSDB_META_H_
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TSDB_META_FILE_
#define _TSDB_META_FILE_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int fd; // File descriptor
int nDel; // number of deletions
int tombSize; // deleted size
int64_t size; // Total file size
void * map; // Map from uid ==> position
iterFunc iFunc;
afterFunc aFunc;
void * appH;
} SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
void tsdbCloseMetaFile(SMetaFile *mfh);
#ifdef __cplusplus
}
#endif
#endif // _TSDB_META_FILE_
\ No newline at end of file
......@@ -15,7 +15,7 @@
#include <stdlib.h>
#include "tsdb.h"
#include "tsdbCache.h"
#include "tsdbMain.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list);
......
......@@ -23,7 +23,7 @@
#include <unistd.h>
#include "tutil.h"
#include "tsdbFile.h"
#include "tsdbMain.h"
const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD
......
......@@ -14,11 +14,7 @@
// #include "taosdef.h"
// #include "disk.h"
#include "tsdb.h"
#include "tsdbCache.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
#include "tutil.h"
#include "tskiplist.h"
#include "tsdbMain.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
......@@ -50,35 +46,6 @@
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
typedef struct _tsdb_repo {
char *rootDir;
// TSDB configuration
STsdbCfg config;
// The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta;
// The cache Handle
STsdbCache *tsdbCache;
// The TSDB file handle
STsdbFileH *tsdbFileH;
// Disk tier handle for multi-tier storage
void *diskTier;
pthread_mutex_t mutex;
int commit;
pthread_t commitThread;
// A limiter to monitor the resources used by tsdb
void *limiter;
int8_t state;
} STsdbRepo;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
......@@ -988,7 +955,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
tdInitDataCols(pCols, pTable->schema);
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
int nBlocks = 0;
......@@ -1040,7 +1007,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
}
}
tdInitDataCols(pCols, pTable->schema);
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (1) {
......
......@@ -4,9 +4,8 @@
#include "tskiplist.h"
#include "tsdb.h"
#include "taosdef.h"
#include "tsdbMeta.h"
#include "hash.h"
#include "tsdbCache.h"
#include "tsdbMain.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
#define TSDB_META_FILE_NAME "META"
......@@ -185,6 +184,18 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
return 0;
}
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE) {
return pTable->schema;
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
if (pSuper == NULL) return NULL;
return pSuper->schema;
} else {
return NULL;
}
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1;
......
......@@ -16,7 +16,7 @@
#include "taosdef.h"
#include "hash.h"
#include "tsdbMetaFile.h"
#include "tsdbMain.h"
#define TSDB_META_FILE_VERSION_MAJOR 1
#define TSDB_META_FILE_VERSION_MINOR 0
......
......@@ -21,8 +21,7 @@
#include "../../../query/inc/qast.h"
#include "../../../query/inc/tsqlfunction.h"
#include "tsdb.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
#include "tsdbMain.h"
#define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
......@@ -112,7 +111,7 @@ enum {
};
typedef struct STsdbQueryHandle {
struct STsdbRepo* pTsdb;
STsdbRepo* pTsdb;
int8_t model; // access model, single table model or multi-table model
SQueryFilePos cur; // current position
SQueryFilePos start; // the start position, used for secondary/third iteration
......@@ -809,7 +808,7 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) {
static SArray* createTableIdArrayList(STsdbRepo* tsdb, int64_t uid) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pTable != NULL); //assert pTable is a super table
......@@ -1118,7 +1117,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
return TSDB_CODE_SUCCESS;
}
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
// SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
// no condition, all tables created according to the stable will involved in querying
if (pTagCond == NULL || wcslen(pTagCond) == 0) {
return createTableIdArrayList(tsdb, uid);
......
......@@ -4,8 +4,7 @@
#include "tsdb.h"
#include "dataformat.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
#include "tsdbMain.h"
double getCurTime() {
struct timeval tv;
......@@ -80,7 +79,7 @@ TEST(TsdbTest, createRepo) {
// // 3. Loop to write some simple data
int nRows = 10000000;
int rowsPerSubmit = 10;
int rowsPerSubmit = 100;
int64_t start_time = 1584081000000;
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
......@@ -96,7 +95,7 @@ TEST(TsdbTest, createRepo) {
pBlock->len = 0;
for (int i = 0; i < rowsPerSubmit; i++) {
// start_time += 1000;
start_time -= 1000;
start_time += 1000;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册