提交 0fcbd2f5 编写于 作者: H Haojun Liao

Merge branch '3.0' of github.com:taosdata/tdengine into 3.0

...@@ -42,7 +42,7 @@ if(${BUILD_WITH_LEVELDB}) ...@@ -42,7 +42,7 @@ if(${BUILD_WITH_LEVELDB})
endif(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB})
## rocksdb ## rocksdb
option(BUILD_WITH_ROCKSDB "If build with rocksdb" ON) option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF)
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_SCHEMA_H_
#define _TD_COMMON_SCHEMA_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
int8_t type; // Column type
int16_t colId; // column ID
int16_t bytes; // column bytes (restore to int16_t in case of misuse)
uint16_t offset; // point offset in SDataRow after the header part.
} STColumn;
#define colType(col) ((col)->type)
#define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t))
#define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o))
// ----------------- TSDB SCHEMA DEFINITION
typedef struct {
int version; // version
int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + //
// (bytes))
uint16_t flen; // First part length in a SDataRow after the header part
uint16_t vlen; // pure value part length, excluded the overhead (bytes only)
STColumn columns[];
} STSchema;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaVersion(s) ((s)->version)
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaVLen(s) ((s)->vlen)
#define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s))
STSchema *tdDupSchema(STSchema *pSchema);
int tdEncodeSchema(void **buf, STSchema *pSchema);
void * tdDecodeSchema(void *buf, STSchema **pRSchema);
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId);
if (ptr == NULL) return NULL;
return (STColumn *)ptr;
}
// ----------------- SCHEMA BUILDER DEFINITION
typedef struct {
int tCols;
int nCols;
int tlen;
uint16_t flen;
uint16_t vlen;
int version;
STColumn *columns;
} STSchemaBuilder;
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_SCHEMA_H_*/
\ No newline at end of file
...@@ -87,7 +87,7 @@ typedef struct { ...@@ -87,7 +87,7 @@ typedef struct {
#define schemaColAt(s, i) ((s)->columns + i) #define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s)) #define tdFreeSchema(s) tfree((s))
STSchema *tdDupSchema(STSchema *pSchema); STSchema *tdDupSchema(const STSchema *pSchema);
int tdEncodeSchema(void **buf, STSchema *pSchema); int tdEncodeSchema(void **buf, STSchema *pSchema);
void * tdDecodeSchema(void *buf, STSchema **pRSchema); void * tdDecodeSchema(void *buf, STSchema **pRSchema);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_SCHEMA_H_
#define _TD_COMMON_SCHEMA_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_SCHEMA_H_*/
\ No newline at end of file
...@@ -16,30 +16,43 @@ ...@@ -16,30 +16,43 @@
#ifndef _TD_TKV_H_ #ifndef _TD_TKV_H_
#define _TD_TKV_H_ #define _TD_TKV_H_
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
// Types exported // Types exported
typedef struct STkvDb STkvDb; typedef struct STkvDb STkvDb;
typedef struct STkvOptions STkvOptions; typedef struct STkvOpts STkvOpts;
typedef struct STkvCache STkvCache; typedef struct STkvCache STkvCache;
typedef struct STkvReadOpts STkvReadOpts;
typedef struct STkvWriteOpts STkvWriteOpts;
// DB operations // DB operations
STkvDb *tkvOpen(const STkvOptions *options, const char *path); STkvDb *tkvOpen(const STkvOpts *options, const char *path);
void tkvClose(STkvDb *db); void tkvClose(STkvDb *db);
void tkvPut(STkvDb *db, void * /*TODO*/); void tkvPut(STkvDb *db, STkvWriteOpts *, char *key, size_t keylen, char *val, size_t vallen);
char * tkvGet(STkvDb *db, STkvReadOpts *, char *key, size_t keylen, size_t *vallen);
// DB options // DB options
STkvOptions *tkvOptionsCreate(); STkvOpts *tkvOptionsCreate();
void tkvOptionsDestroy(STkvOptions *); void tkvOptionsDestroy(STkvOpts *);
void tkvOptionsSetCache(STkvOptions *, STkvCache *); void tkvOptionsSetCache(STkvOpts *, STkvCache *);
// DB cache // DB cache
typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType; typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType;
STkvCache *tkvCacheCreate(size_t capacity, ETkvCacheType type); STkvCache *tkvCacheCreate(size_t capacity, ETkvCacheType type);
void tkvCacheDestroy(STkvCache *); void tkvCacheDestroy(STkvCache *);
// STkvReadOpts
STkvReadOpts *tkvReadOptsCreate();
void tkvReadOptsDestroy(STkvReadOpts *);
// STkvWriteOpts
STkvWriteOpts *tkvWriteOptsCreate();
void tkvWriteOptsDestroy(STkvWriteOpts *);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -58,18 +58,25 @@ void walStop(twalh); ...@@ -58,18 +58,25 @@ void walStop(twalh);
void walClose(twalh); void walClose(twalh);
//write //write
int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
void walFsync(twalh, bool forceHint); int64_t walWrite(twalh, void* body, int32_t bodyLen);
//int32_t walCommit(twalh, int64_t ver); int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize);
//int32_t walRollback(twalh, int64_t ver);
//apis for lifecycle management
void walFsync(twalh, bool force);
int32_t walCommit(twalh, int64_t ver);
//truncate after
int32_t walRollback(twalh, int64_t ver);
//notify that previous log can be pruned safely
int32_t walPrune(twalh, int64_t ver);
//read //read
int32_t walRead(twalh, SWalHead **, int64_t ver); int32_t walRead(twalh, SWalHead **, int64_t ver);
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
//life cycle //lifecycle check
int32_t walDataPersisted(twalh, int64_t ver);
int32_t walFirstVer(twalh); int32_t walFirstVer(twalh);
int32_t walPersistedVer(twalh);
int32_t walLastVer(twalh); int32_t walLastVer(twalh);
//int32_t walDataCorrupted(twalh); //int32_t walDataCorrupted(twalh);
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
void taosRemoveDir(char *dirname); void taosRemoveDir(const char *dirname);
bool taosDirExist(char *dirname); bool taosDirExist(char *dirname);
bool taosMkDir(char *dirname); bool taosMkDir(char *dirname);
void taosRemoveOldFiles(char *dirname, int32_t keepDays); void taosRemoveOldFiles(char *dirname, int32_t keepDays);
......
...@@ -27,33 +27,44 @@ extern "C" { ...@@ -27,33 +27,44 @@ extern "C" {
typedef uint64_t tuid_t; typedef uint64_t tuid_t;
// Types exported // Types exported
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
typedef struct SMetaOptions SMetaOptions; typedef struct SMetaOpts SMetaOpts;
typedef struct SMetaQueryHandle SMetaQueryHandle; typedef struct SMetaQueryHandle SMetaQueryHandle;
typedef struct SMetaQueryOptions SMetaQueryOptions; typedef struct SMetaQueryOpts SMetaQueryOpts;
typedef struct STableOpts STableOpts;
// SMeta operations // SMeta operations
int metaCreate(const char *path); int metaCreate(const char *path);
int metaDestroy(const char *path); void metaDestroy(const char *path);
SMeta *metaOpen(SMetaOptions *); SMeta *metaOpen(SMetaOpts *);
void metaClose(SMeta *); void metaClose(SMeta *);
int metaCreateTable(SMeta *, void *); int metaCreateTable(SMeta *, STableOpts *);
int metaDropTable(SMeta *, uint64_t tuid_t); int metaDropTable(SMeta *, uint64_t tuid_t);
int metaAlterTable(SMeta *, void *); int metaAlterTable(SMeta *, void *);
int metaCommit(SMeta *); int metaCommit(SMeta *);
// Options // Options
SMetaOptions *metaOptionsCreate(); SMetaOpts *metaOptionsCreate();
void metaOptionsDestroy(SMetaOptions *); void metaOptionsDestroy(SMetaOpts *);
void metaOptionsSetCache(SMetaOptions *, size_t capacity); void metaOptionsSetCache(SMetaOpts *, size_t capacity);
// SMetaQueryHandle // SMetaQueryHandle
SMetaQueryHandle *metaQueryHandleCreate(SMetaQueryOptions *); SMetaQueryHandle *metaQueryHandleCreate(SMetaQueryOpts *);
void metaQueryHandleDestroy(SMetaQueryHandle *); void metaQueryHandleDestroy(SMetaQueryHandle *);
// SMetaQueryOptions // SMetaQueryOpts
SMetaQueryOptions *metaQueryOptionsCreate(); SMetaQueryOpts *metaQueryOptionsCreate();
void metaQueryOptionsDestroy(SMetaQueryOptions *); void metaQueryOptionsDestroy(SMetaQueryOpts *);
// STableOpts
void metaTableOptsInit(STableOpts *, int8_t type, const char *name, const STSchema *pSchema);
/* -------------------------------- Hided implementations -------------------------------- */
struct STableOpts {
int8_t type;
char * name;
STSchema *pSchema;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -48,7 +48,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { ...@@ -48,7 +48,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
/** /**
* Duplicate the schema and return a new object * Duplicate the schema and return a new object
*/ */
STSchema *tdDupSchema(STSchema *pSchema) { STSchema *tdDupSchema(const STSchema *pSchema) {
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
STSchema *tSchema = (STSchema *)malloc(tlen); STSchema *tSchema = (STSchema *)malloc(tlen);
......
...@@ -94,8 +94,6 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time ...@@ -94,8 +94,6 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time
parseLocaltimeWithDst parseLocaltimeWithDst
}; };
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
......
...@@ -4,4 +4,8 @@ target_include_directories( ...@@ -4,4 +4,8 @@ target_include_directories(
tkv tkv
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
tkv
PUBLIC os
) )
\ No newline at end of file
...@@ -11,4 +11,71 @@ ...@@ -11,4 +11,71 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file
#include "tkv.h"
struct STkvDb {
// TODO
};
struct STkvOpts {
// TODO
};
struct STkvCache {
// TODO
};
struct STkvReadOpts {
// TODO
};
struct STkvWriteOpts {
// TODO
};
STkvDb *tkvOpen(const STkvOpts *options, const char *path) {
// TODO
return NULL;
}
void tkvClose(STkvDb *db) {
// TODO
}
void tkvPut(STkvDb *db, STkvWriteOpts *pwopts, char *key, size_t keylen, char *val, size_t vallen) {
// TODO
}
char *tkvGet(STkvDb *db, STkvReadOpts *propts, char *key, size_t keylen, size_t *vallen) {
// TODO
return NULL;
}
STkvOpts *tkvOptionsCreate() {
// TODO
return NULL;
}
void tkvOptionsDestroy(STkvOpts *popts) {
// TODO
}
void tkvOptionsSetCache(STkvOpts *popts, STkvCache *pCache) {
// TODO
}
STkvReadOpts *tkvReadOptsCreate() {
// TODO
return NULL;
}
void tkvReadOptsDestroy(STkvReadOpts *propts) {
// TODO
}
STkvWriteOpts *tkvWriteOptsCreate() {
// TODO
return NULL;
}
void tkvWriteOptsDestroy(STkvWriteOpts *pwopts) {
// TODO
}
\ No newline at end of file
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include <unistd.h> #include <unistd.h>
#include <wordexp.h> #include <wordexp.h>
void taosRemoveDir(char *dirname) { void taosRemoveDir(const char *dirname) {
DIR *dir = opendir(dirname); DIR *dir = opendir(dirname);
if (dir == NULL) return; if (dir == NULL) return;
...@@ -48,7 +48,7 @@ void taosRemoveDir(char *dirname) { ...@@ -48,7 +48,7 @@ void taosRemoveDir(char *dirname) {
taosRemoveDir(filename); taosRemoveDir(filename);
} else { } else {
(void)remove(filename); (void)remove(filename);
printf("file:%s is removed", filename); printf("file:%s is removed\n", filename);
} }
} }
......
...@@ -63,4 +63,6 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { ...@@ -63,4 +63,6 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
return gettimeofday(tv, NULL); return gettimeofday(tv, NULL);
} }
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
#endif #endif
...@@ -16,10 +16,11 @@ ...@@ -16,10 +16,11 @@
#ifndef _TD_VNODE_MAIN_H_ #ifndef _TD_VNODE_MAIN_H_
#define _TD_VNODE_MAIN_H_ #define _TD_VNODE_MAIN_H_
#include "vnodeInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
int32_t vnodeInitMain(); int32_t vnodeInitMain();
void vnodeCleanupMain(); void vnodeCleanupMain();
......
...@@ -37,4 +37,4 @@ void vnodeWaitWriteCompleted(SVnode *pVnode); ...@@ -37,4 +37,4 @@ void vnodeWaitWriteCompleted(SVnode *pVnode);
} }
#endif #endif
#endif /*_TD_VNODE_WRITE_H_*/ #endif /*_TD_VNODE_WRITE_H_*/
\ No newline at end of file
...@@ -8,8 +8,8 @@ target_include_directories( ...@@ -8,8 +8,8 @@ target_include_directories(
target_link_libraries( target_link_libraries(
meta meta
PUBLIC common PUBLIC common
PUBLIC tkv
) )
target_link_libraries(meta PUBLIC rocksdb)
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_UID_H_
#define _TD_META_UID_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef uint64_t tb_uid_t;
tb_uid_t metaGenerateUid();
#define IVLD_TB_UID 0
#ifdef __cplusplus
}
#endif
#endif /*_TD_META_UID_H_*/
\ No newline at end of file
...@@ -13,19 +13,21 @@ ...@@ -13,19 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <rocksdb/c.h> #include "tkv.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "ttypes.h" #include "ttypes.h"
#include "meta.h" #include "meta.h"
#include "metaUid.h"
/* -------------------- Structures -------------------- */
typedef struct STable { typedef struct STable {
uint64_t uid; tb_uid_t uid;
tstr * name; char * name;
uint64_t suid; tb_uid_t suid;
SArray * schema; SArray * schema;
} STable; } STable;
...@@ -41,15 +43,21 @@ typedef struct STableObj { ...@@ -41,15 +43,21 @@ typedef struct STableObj {
struct SMeta { struct SMeta {
pthread_rwlock_t rwLock; pthread_rwlock_t rwLock;
SHashObj * pTableObjHash; // uid --> STableObj SHashObj *pTableObjHash; // uid --> STableObj
SList * stbList; // super table list SList * stbList; // super table list
rocksdb_t *tbnameDb; // tbname --> uid STkvDb * tbnameDb; // tbname --> uid
rocksdb_t *tagDb; // uid --> tag STkvDb * tagDb; // uid --> tag
rocksdb_t *schemaDb; STkvDb * schemaDb;
size_t totalUsed; STkvDb * tagIdx;
size_t totalUsed;
}; };
SMeta *metaOpen(SMetaOptions *options) { static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver);
static STableObj *metaTableObjNew();
/* -------------------- Methods -------------------- */
SMeta *metaOpen(SMetaOpts *options) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
char * err = NULL; char * err = NULL;
...@@ -64,21 +72,148 @@ SMeta *metaOpen(SMetaOptions *options) { ...@@ -64,21 +72,148 @@ SMeta *metaOpen(SMetaOptions *options) {
pMeta->stbList = tdListNew(sizeof(STableObj *)); pMeta->stbList = tdListNew(sizeof(STableObj *));
// Options
STkvOpts *dbOptions = tkvOptionsCreate();
taosMkDir("meta");
// Open tbname DB // Open tbname DB
rocksdb_options_t *tbnameDbOptions = rocksdb_options_create(); pMeta->tbnameDb = tkvOpen(dbOptions, "meta/tbname_uid_db");
pMeta->tbnameDb = rocksdb_open(tbnameDbOptions, "tbname_uid_db", &err);
// Open tag DB // Open tag DB
pMeta->tagDb = rocksdb_open(tbnameDbOptions, "uid_tag_db", &err); pMeta->tagDb = tkvOpen(dbOptions, "meta/uid_tag_db");
// Open schema DB // Open schema DB
pMeta->schemaDb = rocksdb_open(tbnameDbOptions, "schema_db", &err); pMeta->schemaDb = tkvOpen(dbOptions, "meta/schema_db");
// Open tag index
pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db");
tkvOptionsDestroy(dbOptions);
return pMeta; return pMeta;
} }
void metaClose(SMeta *pMeta) { void metaClose(SMeta *pMeta) {
// TODO if (pMeta) {
tkvClose(pMeta->tagIdx);
tkvClose(pMeta->schemaDb);
tkvClose(pMeta->tagDb);
tkvClose(pMeta->tbnameDb);
tdListFree(pMeta->stbList);
taosHashCleanup(pMeta->pTableObjHash);
pthread_rwlock_destroy(&(pMeta->rwLock));
}
}
int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
size_t vallen;
STkvReadOpts *ropt;
STableObj * pTableObj = NULL;
STkvWriteOpts *wopt;
// Check if table already exists
ropt = tkvReadOptsCreate();
char *uidStr = tkvGet(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (uidStr != NULL) {
// Has duplicate named table
return -1;
}
tkvReadOptsDestroy(ropt);
// Create table obj
pTableObj = metaTableObjNew();
if (pTableObj == NULL) {
// TODO
return -1;
}
// Create table object
pTableObj->pTable = metaTableNew(metaGenerateUid(), pTableOpts->name, schemaVersion(pTableOpts->pSchema));
if (pTableObj->pTable == NULL) {
// TODO
}
pthread_rwlock_rdlock(&pMeta->rwLock);
taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj));
wopt = tkvWriteOptsCreate();
// rocksdb_writeoptions_disable_WAL(wopt, 1);
// Add to tbname db
tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid,
sizeof(tb_uid_t));
// Add to schema db
char id[12];
char buf[256];
void *pBuf = buf;
*(tb_uid_t *)id = pTableObj->pTable->uid;
*(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema);
int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema);
tkvPut(pMeta->schemaDb, wopt, id, 12, buf, size);
tkvWriteOptsDestroy(wopt);
pthread_rwlock_unlock(&pMeta->rwLock);
return 0;
}
void metaDestroy(const char *path) { taosRemoveDir(path); }
int metaCommit(SMeta *meta) { return 0; }
void metaTableOptsInit(STableOpts *pTableOpts, int8_t type, const char *name, const STSchema *pSchema) {
pTableOpts->type = type;
pTableOpts->name = strdup(name);
pTableOpts->pSchema = tdDupSchema(pSchema);
}
/* -------------------- Static Methods -------------------- */
static STable *metaTableNew(tb_uid_t uid, const char *name, int32_t sver) {
STable *pTable = NULL;
pTable = (STable *)malloc(sizeof(*pTable));
if (pTable == NULL) {
// TODO
return NULL;
}
pTable->schema = taosArrayInit(0, sizeof(int32_t));
if (pTable->schema == NULL) {
// TODO
return NULL;
}
pTable->uid = uid;
pTable->name = strdup(name);
pTable->suid = IVLD_TB_UID;
taosArrayPush(pTable->schema, &sver);
return pTable;
} }
int metaCommit(SMeta *meta) { return 0; } static STableObj *metaTableObjNew() {
\ No newline at end of file STableObj *pTableObj = NULL;
pTableObj = (STableObj *)malloc(sizeof(*pTableObj));
if (pTableObj == NULL) {
return NULL;
}
pTableObj->pin = true;
pTableObj->ref = 1;
taosInitRWLatch(&(pTableObj->latch));
pTableObj->offset = UINT64_MAX;
pTableObj->ctbList = NULL;
pTableObj->pTable = NULL;
return pTableObj;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaUid.h"
static tb_uid_t nuid = IVLD_TB_UID;
tb_uid_t metaGenerateUid() {
// TODO: need a more complex UID generator
return ++nuid;
}
\ No newline at end of file
...@@ -2,6 +2,7 @@ add_executable(metaTest "") ...@@ -2,6 +2,7 @@ add_executable(metaTest "")
target_sources(metaTest target_sources(metaTest
PRIVATE PRIVATE
"../src/meta.c" "../src/meta.c"
"../src/metaUid.c"
"metaTests.cpp" "metaTests.cpp"
) )
target_include_directories(metaTest target_include_directories(metaTest
...@@ -13,6 +14,11 @@ target_link_libraries(metaTest ...@@ -13,6 +14,11 @@ target_link_libraries(metaTest
os os
util util
common common
rocksdb
gtest_main gtest_main
tkv
)
enable_testing()
add_test(
NAME meta_test
COMMAND metaTest
) )
\ No newline at end of file
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <string.h>
#include <iostream> #include <iostream>
#include "meta.h" #include "meta.h"
TEST(MetaTest, meta_open_test) { TEST(MetaTest, meta_open_test) {
metaOpen(NULL); // Open Meta
std::cout << "Hello META!" << std::endl; SMeta *meta = metaOpen(NULL);
std::cout << "Meta is opened!" << std::endl;
// Create tables
STableOpts tbOpts;
char tbname[128];
STSchema * pSchema;
STSchemaBuilder sb;
tdInitTSchemaBuilder(&sb, 0);
for (size_t i = 0; i < 10; i++) {
tdAddColToSchema(&sb, TSDB_DATA_TYPE_TIMESTAMP, i, 8);
}
pSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
for (size_t i = 0; i < 1000000; i++) {
sprintf(tbname, "tb%ld", i);
metaTableOptsInit(&tbOpts, 0, tbname, pSchema);
metaCreateTable(meta, &tbOpts);
}
// Close Meta
metaClose(meta);
std::cout << "Meta is closed!" << std::endl;
// Destroy Meta
metaDestroy("meta");
std::cout << "Meta is destroyed!" << std::endl;
} }
\ No newline at end of file
...@@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC}) ...@@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC})
target_include_directories( target_include_directories(
tq tq
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PRIVATE "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
) )
target_link_libraries( target_link_libraries(
os wal
) )
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册