提交 16f94e23 编写于 作者: S slguan

Merge branch '2.0' into refact/slguan

...@@ -107,9 +107,9 @@ IF (TD_LINUX_64) ...@@ -107,9 +107,9 @@ IF (TD_LINUX_64)
SET(RELEASE_FLAGS "-O0") SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM) IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang") IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE () ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF () ENDIF ()
ELSE () ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
......
...@@ -597,11 +597,16 @@ bool tSkipListIterNext(SSkipListIterator *iter) { ...@@ -597,11 +597,16 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
pthread_rwlock_unlock(pSkipList->lock); pthread_rwlock_unlock(pSkipList->lock);
} }
return iter->cur != NULL; return iter->cur != pSkipList->pTail;
} }
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { return (iter == NULL)? NULL:iter->cur; } SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) {
if (iter == NULL || iter->cur == iter->pSkipList->pTail) {
return NULL;
} else {
return iter->cur;
}
}
void* tSkipListDestroyIter(SSkipListIterator* iter) { void* tSkipListDestroyIter(SSkipListIterator* iter) {
if (iter == NULL) { if (iter == NULL) {
return NULL; return NULL;
......
...@@ -101,8 +101,6 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); ...@@ -101,8 +101,6 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
struct STsdbRepo;
// SSubmitMsg Iterator // SSubmitMsg Iterator
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
...@@ -328,7 +326,7 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); ...@@ -328,7 +326,7 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
* *
*/ */
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TD_TSDBCACHE_H_)
#define _TD_TSDBCACHE_H_
#include <stdint.h>
#include "taosdef.h"
#include "tlist.h"
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct {
int blockId;
int offset;
int remain;
int padding;
char data[];
} STsdbCacheBlock;
typedef struct {
int64_t index;
SList * memPool;
} STsdbCachePool;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfPoints;
SList * list;
} SCacheMem;
typedef struct {
int maxBytes;
int cacheBlockSize;
int totalCacheBlocks;
STsdbCachePool pool;
STsdbCacheBlock *curBlock;
SCacheMem * mem;
SCacheMem * imem;
tsdb_repo_t * pRepo;
} STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDBCACHE_H_
...@@ -12,20 +12,160 @@ ...@@ -12,20 +12,160 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if !defined(_TD_TSDB_FILE_H_) #ifndef _TD_TSDB_MAIN_H_
#define _TD_TSDB_FILE_H_ #define _TD_TSDB_MAIN_H_
#include <stdint.h>
#include "dataformat.h"
#include "taosdef.h"
#include "tglobalcfg.h"
#include "tsdb.h" #include "tsdb.h"
#include "tlist.h"
#include "tglobalcfg.h"
#include "tskiplist.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
// ------------------------------ TSDB META FILE INTERFACES ------------------------------
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int fd; // File descriptor
int nDel; // number of deletions
int tombSize; // deleted size
int64_t size; // Total file size
void * map; // Map from uid ==> position
iterFunc iFunc;
afterFunc aFunc;
void * appH;
} SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
void tsdbCloseMetaFile(SMetaFile *mfh);
// ------------------------------ TSDB META INTERFACES ------------------------------
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfPoints;
void * pData;
} SMemTable;
// ---------- TSDB TABLE DEFINITION
typedef struct STable {
int8_t type;
STableId tableId;
int64_t superUid; // Super table UID
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void * eventHandler; // TODO
void * streamHandler; // TODO
struct STable *next; // TODO: remove the next
} STable;
void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void * tsdbFreeEncode(void *cont);
// ---------- TSDB META HANDLE DEFINITION
typedef struct {
int32_t maxTables; // Max number of tables
int32_t nTables; // Tables created
STable **tables; // table array
STable *superList; // super table list TODO: change it to list container
void *map; // table map of (uid ===> table)
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
} STsdbMeta;
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
#define TSDB_TABLE_NAME(pTable) ((pTable)->tableName)
#define TSDB_TABLE_TYPE(pTable) ((pTable)->type)
#define TSDB_TABLE_SUPER_TABLE_UID(pTable) ((pTable)->stableUid)
#define TSDB_TABLE_IS_SUPER_TABLE(pTable) (TSDB_TABLE_TYPE(pTable) == TSDB_SUPER_TABLE)
#define TSDB_TABLE_TAG_VALUE(pTable) ((pTable)->pTagVal)
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
// ------------------------------ TSDB CACHE INTERFACES ------------------------------
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct {
int blockId;
int offset;
int remain;
int padding;
char data[];
} STsdbCacheBlock;
typedef struct {
int64_t index;
SList * memPool;
} STsdbCachePool;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfPoints;
SList * list;
} SCacheMem;
typedef struct {
int maxBytes;
int cacheBlockSize;
int totalCacheBlocks;
STsdbCachePool pool;
STsdbCacheBlock *curBlock;
SCacheMem * mem;
SCacheMem * imem;
tsdb_repo_t * pRepo;
} STsdbCache;
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
void tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
// ------------------------------ TSDB FILE INTERFACES ------------------------------
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
...@@ -174,11 +314,41 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD ...@@ -174,11 +314,41 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
// TODO: need an API to merge all sub-block data into one
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
// TSDB repository definition
typedef struct _tsdb_repo {
char *rootDir;
// TSDB configuration
STsdbCfg config;
// The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta;
// The cache Handle
STsdbCache *tsdbCache;
// The TSDB file handle
STsdbFileH *tsdbFileH;
// Disk tier handle for multi-tier storage
void *diskTier;
pthread_mutex_t mutex;
int commit;
pthread_t commitThread;
// A limiter to monitor the resources used by tsdb
void *limiter;
int8_t state;
} STsdbRepo;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // _TD_TSDB_FILE_H_ #endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if !defined(_TSDB_META_H_)
#define _TSDB_META_H_
#include <pthread.h>
#include "tsdb.h"
#include "dataformat.h"
#include "tskiplist.h"
#include "tsdbMetaFile.h"
#ifdef __cplusplus
extern "C" {
#endif
// #include "taosdef.h"
// Initially, there are 4 tables
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfPoints;
void * pData;
} SMemTable;
// ---------- TSDB TABLE DEFINITION
typedef struct STable {
int8_t type;
STableId tableId;
int64_t superUid; // Super table UID
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void * eventHandler; // TODO
void * streamHandler; // TODO
struct STable *next; // TODO: remove the next
} STable;
void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void * tsdbFreeEncode(void *cont);
// ---------- TSDB META HANDLE DEFINITION
typedef struct {
int32_t maxTables; // Max number of tables
int32_t nTables; // Tables created
STable **tables; // table array
STable *superList; // super table list TODO: change it to list container
void *map; // table map of (uid ===> table)
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
} STsdbMeta;
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
// ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
#define TSDB_TABLE_UID(pTable) ((pTable)->uid)
#define TSDB_TABLE_NAME(pTable) ((pTable)->tableName)
#define TSDB_TABLE_TYPE(pTable) ((pTable)->type)
#define TSDB_TABLE_SUPER_TABLE_UID(pTable) ((pTable)->stableUid)
#define TSDB_TABLE_IS_SUPER_TABLE(pTable) (TSDB_TABLE_TYPE(pTable) == TSDB_SUPER_TABLE)
#define TSDB_TABLE_TAG_VALUE(pTable) ((pTable)->pTagVal)
#define TSDB_TABLE_CACHE_DATA(pTable) ((pTable)->content.pData)
#define TSDB_SUPER_TABLE_INDEX(pTable) ((pTable)->content.pIndex)
// ---- Operation on SMetaHandle
#define TSDB_NUM_OF_TABLES(pHandle) ((pHandle)->numOfTables)
#define TSDB_NUM_OF_SUPER_TABLES(pHandle) ((pHandle)->numOfSuperTables)
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
#ifdef __cplusplus
}
#endif
#endif // _TSDB_META_H_
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TSDB_META_FILE_
#define _TSDB_META_FILE_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int fd; // File descriptor
int nDel; // number of deletions
int tombSize; // deleted size
int64_t size; // Total file size
void * map; // Map from uid ==> position
iterFunc iFunc;
afterFunc aFunc;
void * appH;
} SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
void tsdbCloseMetaFile(SMetaFile *mfh);
#ifdef __cplusplus
}
#endif
#endif // _TSDB_META_FILE_
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include <stdlib.h> #include <stdlib.h>
#include "tsdb.h" #include "tsdb.h"
#include "tsdbCache.h" #include "tsdbMain.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache); static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list); static void tsdbFreeBlockList(SList *list);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include <unistd.h> #include <unistd.h>
#include "tutil.h" #include "tutil.h"
#include "tsdbFile.h" #include "tsdbMain.h"
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD ".head", // TSDB_FILE_TYPE_HEAD
......
...@@ -14,11 +14,7 @@ ...@@ -14,11 +14,7 @@
// #include "taosdef.h" // #include "taosdef.h"
// #include "disk.h" // #include "disk.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbCache.h" #include "tsdbMain.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
#include "tutil.h"
#include "tskiplist.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
...@@ -50,35 +46,6 @@ ...@@ -50,35 +46,6 @@
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
typedef struct _tsdb_repo {
char *rootDir;
// TSDB configuration
STsdbCfg config;
// The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta;
// The cache Handle
STsdbCache *tsdbCache;
// The TSDB file handle
STsdbFileH *tsdbFileH;
// Disk tier handle for multi-tier storage
void *diskTier;
pthread_mutex_t mutex;
int commit;
pthread_t commitThread;
// A limiter to monitor the resources used by tsdb
void *limiter;
int8_t state;
} STsdbRepo;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
...@@ -988,7 +955,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -988,7 +955,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
} }
tdInitDataCols(pCols, pTable->schema); tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
int nBlocks = 0; int nBlocks = 0;
...@@ -1040,7 +1007,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1040,7 +1007,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
} }
} }
tdInitDataCols(pCols, pTable->schema); tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (1) { while (1) {
......
...@@ -4,9 +4,8 @@ ...@@ -4,9 +4,8 @@
#include "tskiplist.h" #include "tskiplist.h"
#include "tsdb.h" #include "tsdb.h"
#include "taosdef.h" #include "taosdef.h"
#include "tsdbMeta.h"
#include "hash.h" #include "hash.h"
#include "tsdbCache.h" #include "tsdbMain.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here #define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
#define TSDB_META_FILE_NAME "META" #define TSDB_META_FILE_NAME "META"
...@@ -185,6 +184,18 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) { ...@@ -185,6 +184,18 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
return 0; return 0;
} }
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE) {
return pTable->schema;
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
if (pSuper == NULL) return NULL;
return pSuper->schema;
} else {
return NULL;
}
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1; if (tsdbCheckTableCfg(pCfg) < 0) return -1;
...@@ -236,10 +247,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -236,10 +247,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
table->type = TSDB_NORMAL_TABLE; table->type = TSDB_NORMAL_TABLE;
table->superUid = -1; table->superUid = -1;
table->schema = tdDupSchema(pCfg->schema); table->schema = tdDupSchema(pCfg->schema);
if (schemaNCols(table->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(table->schema);
tdUpdateSchema(table->schema);
int bytes = tdMaxRowBytesFromSchema(table->schema);
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
} }
// Register to meta // Register to meta
...@@ -356,6 +363,14 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -356,6 +363,14 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
pMeta->nTables++; pMeta->nTables++;
} }
// Update the pMeta->maxCols and pMeta->maxRowBytes
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) {
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema);
int bytes = tdMaxRowBytesFromSchema(pTable->schema);
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
tdUpdateSchema(pTable->schema);
}
return tsdbAddTableIntoMap(pMeta, pTable); return tsdbAddTableIntoMap(pMeta, pTable);
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "hash.h" #include "hash.h"
#include "tsdbMetaFile.h" #include "tsdbMain.h"
#define TSDB_META_FILE_VERSION_MAJOR 1 #define TSDB_META_FILE_VERSION_MAJOR 1
#define TSDB_META_FILE_VERSION_MINOR 0 #define TSDB_META_FILE_VERSION_MINOR 0
......
...@@ -21,8 +21,7 @@ ...@@ -21,8 +21,7 @@
#include "../../../query/inc/qast.h" #include "../../../query/inc/qast.h"
#include "../../../query/inc/tsqlfunction.h" #include "../../../query/inc/tsqlfunction.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFile.h" #include "tsdbMain.h"
#include "tsdbMeta.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) #define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
...@@ -112,7 +111,7 @@ enum { ...@@ -112,7 +111,7 @@ enum {
}; };
typedef struct STsdbQueryHandle { typedef struct STsdbQueryHandle {
struct STsdbRepo* pTsdb; STsdbRepo* pTsdb;
int8_t model; // access model, single table model or multi-table model int8_t model; // access model, single table model or multi-table model
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
SQueryFilePos start; // the start position, used for secondary/third iteration SQueryFilePos start; // the start position, used for secondary/third iteration
...@@ -809,7 +808,7 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable ...@@ -809,7 +808,7 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable
SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {} SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) { static SArray* createTableIdArrayList(STsdbRepo* tsdb, int64_t uid) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pTable != NULL); //assert pTable is a super table assert(pTable != NULL); //assert pTable is a super table
...@@ -1118,7 +1117,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond ...@@ -1118,7 +1117,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) { // SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len) {
// no condition, all tables created according to the stable will involved in querying // no condition, all tables created according to the stable will involved in querying
if (pTagCond == NULL || wcslen(pTagCond) == 0) { if (pTagCond == NULL || wcslen(pTagCond) == 0) {
return createTableIdArrayList(tsdb, uid); return createTableIdArrayList(tsdb, uid);
......
...@@ -2,10 +2,8 @@ ...@@ -2,10 +2,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <sys/time.h> #include <sys/time.h>
#include "tsdb.h"
#include "dataformat.h" #include "dataformat.h"
#include "tsdbFile.h" #include "tsdbMain.h"
#include "tsdbMeta.h"
double getCurTime() { double getCurTime() {
struct timeval tv; struct timeval tv;
...@@ -49,8 +47,8 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ...@@ -49,8 +47,8 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
} }
// TEST(TsdbTest, DISABLED_createRepo) { TEST(TsdbTest, DISABLED_createRepo) {
TEST(TsdbTest, createRepo) { // TEST(TsdbTest, createRepo) {
STsdbCfg config; STsdbCfg config;
// 1. Create a tsdb repository // 1. Create a tsdb repository
...@@ -79,8 +77,8 @@ TEST(TsdbTest, createRepo) { ...@@ -79,8 +77,8 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// // 3. Loop to write some simple data // // 3. Loop to write some simple data
int nRows = 10000000; int nRows = 1;
int rowsPerSubmit = 10; int rowsPerSubmit = 1;
int64_t start_time = 1584081000000; int64_t start_time = 1584081000000;
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
...@@ -96,7 +94,7 @@ TEST(TsdbTest, createRepo) { ...@@ -96,7 +94,7 @@ TEST(TsdbTest, createRepo) {
pBlock->len = 0; pBlock->len = 0;
for (int i = 0; i < rowsPerSubmit; i++) { for (int i = 0; i < rowsPerSubmit; i++) {
// start_time += 1000; // start_time += 1000;
start_time -= 1000; start_time += 1000;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len); SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema); tdInitDataRow(row, schema);
...@@ -141,8 +139,26 @@ TEST(TsdbTest, createRepo) { ...@@ -141,8 +139,26 @@ TEST(TsdbTest, createRepo) {
// TEST(TsdbTest, DISABLED_openRepo) { // TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) { TEST(TsdbTest, openRepo) {
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode1/tsdb");
ASSERT_NE(pRepo, nullptr); ASSERT_NE(repo, nullptr);
STsdbRepo *pRepo = (STsdbRepo *)repo;
SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1835);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbOpenFile(&pGroup->files[type], O_RDONLY);
}
SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
int k = 0;
} }
TEST(TsdbTest, DISABLED_createFileGroup) { TEST(TsdbTest, DISABLED_createFileGroup) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册