提交 c66a3ba0 编写于 作者: H Hongze Cheng

use bdb to implement meta

上级 26e1504a
set(META_DB_IMPL_LIST "BDB" "SQLITE")
set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation")
set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST})
if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST)
message(STATUS "META DB Impl: ${META_DB_IMPL}==============")
else()
message(FATAL_ERROR "Invalid META DB IMPL: ${META_DB_IMPL}==============")
endif()
aux_source_directory(src META_SRC)
if(${META_DB_IMPL} STREQUAL "BDB")
list(REMOVE_ITEM META_SRC "src/metaSQLiteImpl.c")
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
list(REMOVE_ITEM META_SRC "src/metaBDBImpl.c")
endif()
message("${META_SRC}")
add_library(meta STATIC ${META_SRC})
target_include_directories(
meta
......@@ -7,11 +25,22 @@ target_include_directories(
)
target_link_libraries(
meta
PUBLIC sqlite
PUBLIC common
PUBLIC tkv
)
if(${META_DB_IMPL} STREQUAL "BDB")
target_link_libraries(
meta
PUBLIC bdb
)
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
target_link_libraries(
meta
PUBLIC sqlite
)
endif()
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -16,15 +16,13 @@
#ifndef _TD_META_CACHE_H_
#define _TD_META_CACHE_H_
#include "rocksdb/c.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef rocksdb_cache_t meta_cache_t;
typedef struct SMetaCache SMetaCache;
int metaOpenCache(SMeta *pMeta);
void metaCloseCache(SMeta *pMeta);
......
......@@ -16,28 +16,13 @@
#ifndef _TD_META_DB_H_
#define _TD_META_DB_H_
#define USE_SQLITE_IMPL 1
#include "rocksdb/c.h"
#include "sqlite3.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
#if !USE_SQLITE_IMPL
typedef struct {
rocksdb_t *tbDb; // uid -> tb obj
rocksdb_t *nameDb; // name -> uid
rocksdb_t *tagDb; // uid -> tag
rocksdb_t *schemaDb; // uid+version -> schema
sqlite3 * mapDb; // suid -> uid_list
} meta_db_t;
#else
typedef sqlite3 meta_db_t;
#endif
typedef struct SMetaDB SMetaDB;
int metaOpenDB(SMeta *pMeta);
void metaCloseDB(SMeta *pMeta);
......
......@@ -34,9 +34,9 @@ extern "C" {
struct SMeta {
char* path;
SMetaCfg options;
meta_db_t* pDB;
meta_index_t* pIdx;
meta_cache_t* pCache;
SMetaDB* pDB;
SMetaIdx* pIdx;
SMetaCache* pCache;
STbUidGenerator uidGnrt;
SMemAllocatorFactory* pmaf;
};
......
......@@ -16,15 +16,13 @@
#ifndef _TD_META_IDX_H_
#define _TD_META_IDX_H_
#include "rocksdb/c.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef rocksdb_t meta_index_t;
typedef struct SMetaIdx SMetaIdx;
int metaOpenIdx(SMeta *pMeta);
void metaCloseIdx(SMeta *pMeta);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
#include "db.h"
struct SMetaDB {
DB * pDB;
DB * pIdx;
DB_ENV *pEvn;
};
int metaOpenDB(SMeta *pMeta) {
int ret;
char dbname[128];
pMeta->pDB = (SMetaDB *)calloc(1, sizeof(SMetaDB));
if (pMeta->pDB == NULL) {
// TODO: handle error
return -1;
}
// TODO: create the env
ret = db_create(&(pMeta->pDB->pDB), pMeta->pDB->pEvn, 0);
if (ret != 0) {
// TODO: handle error
return -1;
}
ret = db_create(&(pMeta->pDB->pIdx), pMeta->pDB->pEvn, 0);
if (ret != 0) {
// TODO: handle error
return -1;
}
ret = pMeta->pDB->pDB->open(pMeta->pDB->pDB, /* DB structure pointer */
NULL, /* Transaction pointer */
"meta.db", /* On-disk file that holds the database */
NULL, /* Optional logical database name */
DB_BTREE, /* Database access method */
DB_CREATE, /* Open flags */
0); /* File mode */
if (ret != 0) {
// TODO: handle error
return -1;
}
ret = pMeta->pDB->pIdx->open(pMeta->pDB->pIdx, /* DB structure pointer */
NULL, /* Transaction pointer */
"index.db", /* On-disk file that holds the database */
NULL, /* Optional logical database name */
DB_BTREE, /* Database access method */
DB_CREATE, /* Open flags */
0); /* File mode */
if (ret != 0) {
// TODO: handle error
return -1;
}
// TODO
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
/* TODO */
free(pMeta->pDB);
}
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
// TODO
}
\ No newline at end of file
......@@ -16,22 +16,26 @@
#include "meta.h"
#include "metaDef.h"
struct SMetaCache {
// TODO
};
int metaOpenCache(SMeta *pMeta) {
// TODO
if (pMeta->options.lruSize) {
pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
if (pMeta->pCache == NULL) {
// TODO: handle error
return -1;
}
}
// if (pMeta->options.lruSize) {
// pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
// if (pMeta->pCache == NULL) {
// // TODO: handle error
// return -1;
// }
// }
return 0;
}
void metaCloseCache(SMeta *pMeta) {
if (pMeta->pCache) {
rocksdb_cache_destroy(pMeta->pCache);
pMeta->pCache = NULL;
}
// if (pMeta->pCache) {
// rocksdb_cache_destroy(pMeta->pCache);
// pMeta->pCache = NULL;
// }
}
\ No newline at end of file
......@@ -15,6 +15,10 @@
#include "metaDef.h"
struct SMetaIdx {
/* data */
};
int metaOpenIdx(SMeta *pMeta) {
#if 0
char idxDir[128]; // TODO
......
......@@ -14,79 +14,13 @@
*/
#include "metaDef.h"
#if !USE_SQLITE_IMPL
static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static void metaGetSchemaDBKey(char key[], tb_uid_t uid, int sversion);
#define SCHEMA_KEY_LEN (sizeof(tb_uid_t) + sizeof(int))
#define META_OPEN_DB_IMPL(pDB, options, dir, err) \
do { \
pDB = rocksdb_open(options, dir, &err); \
if (pDB == NULL) { \
metaCloseDB(pMeta); \
rocksdb_options_destroy(options); \
return -1; \
} \
} while (0)
#define META_CLOSE_DB_IMPL(pDB) \
do { \
if (pDB) { \
rocksdb_close(pDB); \
pDB = NULL; \
} \
} while (0)
#endif
#include "sqlite3.h"
int metaOpenDB(SMeta *pMeta) {
char dir[128];
int rc;
char *err = NULL;
#if !USE_SQLITE_IMPL
rocksdb_options_t *options = rocksdb_options_create();
if (pMeta->pCache) {
rocksdb_options_set_row_cache(options, pMeta->pCache);
}
rocksdb_options_set_create_if_missing(options, 1);
pMeta->pDB = (meta_db_t *)calloc(1, sizeof(*(pMeta->pDB)));
if (pMeta->pDB == NULL) {
// TODO: handle error
rocksdb_options_destroy(options);
return -1;
}
// tbDb
sprintf(dir, "%s/tb_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->tbDb, options, dir, err);
// nameDb
sprintf(dir, "%s/name_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->nameDb, options, dir, err);
// tagDb
sprintf(dir, "%s/tag_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->tagDb, options, dir, err);
// schemaDb
sprintf(dir, "%s/schema_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->schemaDb, options, dir, err);
// mapDb
sprintf(dir, "%s/meta.db", pMeta->path);
if (sqlite3_open(dir, &(pMeta->pDB->mapDb)) != SQLITE_OK) {
// TODO
}
// // set read uncommitted
sqlite3_exec(pMeta->pDB->mapDb, "PRAGMA read_uncommitted=true;", 0, 0, 0);
sqlite3_exec(pMeta->pDB->mapDb, "BEGIN;", 0, 0, 0);
rocksdb_options_destroy(options);
#else
sprintf(dir, "%s/meta.db", pMeta->path);
rc = sqlite3_open(dir, &(pMeta->pDB));
if (rc != SQLITE_OK) {
......@@ -136,28 +70,11 @@ int metaOpenDB(SMeta *pMeta) {
sqlite3_exec(pMeta->pDB, "BEGIN;", NULL, NULL, &err);
tfree(err);
#endif
return 0;
}
void metaCloseDB(SMeta *pMeta) {
#if !USE_SQLITE_IMPL
if (pMeta->pDB) {
if (pMeta->pDB->mapDb) {
sqlite3_exec(pMeta->pDB->mapDb, "COMMIT;", 0, 0, 0);
sqlite3_close(pMeta->pDB->mapDb);
pMeta->pDB->mapDb = NULL;
}
META_CLOSE_DB_IMPL(pMeta->pDB->schemaDb);
META_CLOSE_DB_IMPL(pMeta->pDB->tagDb);
META_CLOSE_DB_IMPL(pMeta->pDB->nameDb);
META_CLOSE_DB_IMPL(pMeta->pDB->tbDb);
free(pMeta->pDB);
pMeta->pDB = NULL;
}
#else
if (pMeta->pDB) {
sqlite3_exec(pMeta->pDB, "BEGIN;", NULL, NULL, NULL);
sqlite3_close(pMeta->pDB);
......@@ -165,67 +82,9 @@ void metaCloseDB(SMeta *pMeta) {
}
// TODO
#endif
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
#if !USE_SQLITE_IMPL
tb_uid_t uid;
char * err = NULL;
size_t size;
char pBuf[1024]; // TODO
char sql[128];
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
// Generate a uid for child and normal table
if (pTbOptions->type == META_SUPER_TABLE) {
uid = pTbOptions->stbCfg.suid;
} else {
uid = metaGenerateUid(pMeta);
}
// Save tbname -> uid to tbnameDB
rocksdb_put(pMeta->pDB->nameDb, wopt, pTbOptions->name, strlen(pTbOptions->name), (char *)(&uid), sizeof(uid), &err);
rocksdb_writeoptions_disable_WAL(wopt, 1);
// Save uid -> tb_obj to tbDB
size = metaEncodeTbObjFromTbOptions(pTbOptions, pBuf, 1024);
rocksdb_put(pMeta->pDB->tbDb, wopt, (char *)(&uid), sizeof(uid), pBuf, size, &err);
switch (pTbOptions->type) {
case META_NORMAL_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbCfg.pSchema);
break;
case META_SUPER_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema);
// // save mapDB (really need?)
// rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err);
sprintf(sql, "create table st_%" PRIu64 " (uid BIGINT);", uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
// fprintf(stderr,"Failed to create table, since %s\n", err);
}
break;
case META_CHILD_TABLE:
// save tagDB
rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbCfg.pTag,
kvRowLen(pTbOptions->ctbCfg.pTag), &err);
// save mapDB
sprintf(sql, "insert into st_%" PRIu64 " values (%" PRIu64 ");", pTbOptions->ctbCfg.suid, uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
fprintf(stderr, "failed to insert data, since %s\n", err);
}
break;
default:
ASSERT(0);
}
rocksdb_writeoptions_destroy(wopt);
#else
char sql[256];
char *err = NULL;
int rc;
......@@ -271,7 +130,6 @@ int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
}
tfree(err);
#endif
return 0;
}
......@@ -280,28 +138,3 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
/* TODO */
return 0;
}
\ No newline at end of file
/* ------------------------ STATIC METHODS ------------------------ */
#if !USE_SQLITE_IMPL
static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
char key[64];
char pBuf[1024];
char * ppBuf = pBuf;
size_t vsize;
char * err = NULL;
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(wopt, 1);
metaGetSchemaDBKey(key, uid, schemaVersion(pSchema));
vsize = tdEncodeSchema((void **)(&ppBuf), pSchema);
rocksdb_put(pMeta->pDB->schemaDb, wopt, key, SCHEMA_KEY_LEN, pBuf, vsize, &err);
rocksdb_writeoptions_destroy(wopt);
}
static void metaGetSchemaDBKey(char *key, tb_uid_t uid, int sversion) {
*(tb_uid_t *)key = uid;
*(int *)POINTER_SHIFT(key, sizeof(tb_uid_t)) = sversion;
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册