提交 59de14ea 编写于 作者: H Hongze Cheng

wrap tkv

上级 8287a34c
...@@ -42,7 +42,7 @@ if(${BUILD_WITH_LEVELDB}) ...@@ -42,7 +42,7 @@ if(${BUILD_WITH_LEVELDB})
endif(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB})
## rocksdb ## rocksdb
option(BUILD_WITH_ROCKSDB "If build with rocksdb" ON) option(BUILD_WITH_ROCKSDB "If build with rocksdb" OFF)
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${DEPS_TMP_FILE})
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
......
...@@ -16,30 +16,43 @@ ...@@ -16,30 +16,43 @@
#ifndef _TD_TKV_H_ #ifndef _TD_TKV_H_
#define _TD_TKV_H_ #define _TD_TKV_H_
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
// Types exported // Types exported
typedef struct STkvDb STkvDb; typedef struct STkvDb STkvDb;
typedef struct STkvOptions STkvOptions; typedef struct STkvOpts STkvOpts;
typedef struct STkvCache STkvCache; typedef struct STkvCache STkvCache;
typedef struct STkvReadOpts STkvReadOpts;
typedef struct STkvWriteOpts STkvWriteOpts;
// DB operations // DB operations
STkvDb *tkvOpen(const STkvOptions *options, const char *path); STkvDb *tkvOpen(const STkvOpts *options, const char *path);
void tkvClose(STkvDb *db); void tkvClose(STkvDb *db);
void tkvPut(STkvDb *db, void * /*TODO*/); void tkvPut(STkvDb *db, STkvWriteOpts *, char *key, size_t keylen, char *val, size_t vallen);
char * tkvGet(STkvDb *db, STkvReadOpts *, char *key, size_t keylen, size_t *vallen);
// DB options // DB options
STkvOptions *tkvOptionsCreate(); STkvOpts *tkvOptionsCreate();
void tkvOptionsDestroy(STkvOptions *); void tkvOptionsDestroy(STkvOpts *);
void tkvOptionsSetCache(STkvOptions *, STkvCache *); void tkvOptionsSetCache(STkvOpts *, STkvCache *);
// DB cache // DB cache
typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType; typedef enum { TKV_LRU_CACHE = 0, TKV_LFU_CACHE = 1 } ETkvCacheType;
STkvCache *tkvCacheCreate(size_t capacity, ETkvCacheType type); STkvCache *tkvCacheCreate(size_t capacity, ETkvCacheType type);
void tkvCacheDestroy(STkvCache *); void tkvCacheDestroy(STkvCache *);
// STkvReadOpts
STkvReadOpts *tkvReadOptsCreate();
void tkvReadOptsDestroy(STkvReadOpts *);
// STkvWriteOpts
STkvWriteOpts *tkvWriteOptsCreate();
void tkvWriteOptsDestroy(STkvWriteOpts *);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -4,4 +4,8 @@ target_include_directories( ...@@ -4,4 +4,8 @@ target_include_directories(
tkv tkv
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
tkv
PUBLIC os
) )
\ No newline at end of file
...@@ -11,4 +11,71 @@ ...@@ -11,4 +11,71 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file
#include "tkv.h"
struct STkvDb {
// TODO
};
struct STkvOpts {
// TODO
};
struct STkvCache {
// TODO
};
struct STkvReadOpts {
// TODO
};
struct STkvWriteOpts {
// TODO
};
STkvDb *tkvOpen(const STkvOpts *options, const char *path) {
// TODO
return NULL;
}
void tkvClose(STkvDb *db) {
// TODO
}
void tkvPut(STkvDb *db, STkvWriteOpts *pwopts, char *key, size_t keylen, char *val, size_t vallen) {
// TODO
}
char *tkvGet(STkvDb *db, STkvReadOpts *propts, char *key, size_t keylen, size_t *vallen) {
// TODO
return NULL;
}
STkvOpts *tkvOptionsCreate() {
// TODO
return NULL;
}
void tkvOptionsDestroy(STkvOpts *popts) {
// TODO
}
void tkvOptionsSetCache(STkvOpts *popts, STkvCache *pCache) {
// TODO
}
STkvReadOpts *tkvReadOptsCreate() {
// TODO
return NULL;
}
void tkvReadOptsDestroy(STkvReadOpts *propts) {
// TODO
}
STkvWriteOpts *tkvWriteOptsCreate() {
// TODO
return NULL;
}
void tkvWriteOptsDestroy(STkvWriteOpts *pwopts) {
// TODO
}
\ No newline at end of file
...@@ -8,8 +8,8 @@ target_include_directories( ...@@ -8,8 +8,8 @@ target_include_directories(
target_link_libraries( target_link_libraries(
meta meta
PUBLIC common PUBLIC common
PUBLIC tkv
) )
target_link_libraries(meta PUBLIC rocksdb)
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
......
...@@ -13,8 +13,7 @@ ...@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <rocksdb/c.h> #include "tkv.h"
#include "thash.h" #include "thash.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
...@@ -44,13 +43,13 @@ typedef struct STableObj { ...@@ -44,13 +43,13 @@ typedef struct STableObj {
struct SMeta { struct SMeta {
pthread_rwlock_t rwLock; pthread_rwlock_t rwLock;
SHashObj * pTableObjHash; // uid --> STableObj SHashObj *pTableObjHash; // uid --> STableObj
SList * stbList; // super table list SList * stbList; // super table list
rocksdb_t *tbnameDb; // tbname --> uid STkvDb * tbnameDb; // tbname --> uid
rocksdb_t *tagDb; // uid --> tag STkvDb * tagDb; // uid --> tag
rocksdb_t *schemaDb; STkvDb * schemaDb;
rocksdb_t *tagIdx; STkvDb * tagIdx;
size_t totalUsed; size_t totalUsed;
}; };
static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver); static STable * metaTableNew(tb_uid_t uid, const char *name, int32_t sver);
...@@ -74,34 +73,33 @@ SMeta *metaOpen(SMetaOpts *options) { ...@@ -74,34 +73,33 @@ SMeta *metaOpen(SMetaOpts *options) {
pMeta->stbList = tdListNew(sizeof(STableObj *)); pMeta->stbList = tdListNew(sizeof(STableObj *));
// Options // Options
rocksdb_options_t *dbOptions = rocksdb_options_create(); STkvOpts *dbOptions = tkvOptionsCreate();
rocksdb_options_set_create_if_missing(dbOptions, 1);
taosMkDir("meta"); taosMkDir("meta");
// Open tbname DB // Open tbname DB
pMeta->tbnameDb = rocksdb_open(dbOptions, "meta/tbname_uid_db", &err); pMeta->tbnameDb = tkvOpen(dbOptions, "meta/tbname_uid_db");
// Open tag DB // Open tag DB
pMeta->tagDb = rocksdb_open(dbOptions, "meta/uid_tag_db", &err); pMeta->tagDb = tkvOpen(dbOptions, "meta/uid_tag_db");
// Open schema DB // Open schema DB
pMeta->schemaDb = rocksdb_open(dbOptions, "meta/schema_db", &err); pMeta->schemaDb = tkvOpen(dbOptions, "meta/schema_db");
// Open tag index // Open tag index
pMeta->tagIdx = rocksdb_open(dbOptions, "meta/tag_idx_db", &err); pMeta->tagIdx = tkvOpen(dbOptions, "meta/tag_idx_db");
rocksdb_options_destroy(dbOptions); tkvOptionsDestroy(dbOptions);
return pMeta; return pMeta;
} }
void metaClose(SMeta *pMeta) { void metaClose(SMeta *pMeta) {
if (pMeta) { if (pMeta) {
rocksdb_close(pMeta->tagIdx); tkvClose(pMeta->tagIdx);
rocksdb_close(pMeta->schemaDb); tkvClose(pMeta->schemaDb);
rocksdb_close(pMeta->tagDb); tkvClose(pMeta->tagDb);
rocksdb_close(pMeta->tbnameDb); tkvClose(pMeta->tbnameDb);
tdListFree(pMeta->stbList); tdListFree(pMeta->stbList);
taosHashCleanup(pMeta->pTableObjHash); taosHashCleanup(pMeta->pTableObjHash);
...@@ -110,22 +108,21 @@ void metaClose(SMeta *pMeta) { ...@@ -110,22 +108,21 @@ void metaClose(SMeta *pMeta) {
} }
int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
size_t vallen; size_t vallen;
char * err = NULL; STkvReadOpts *ropt;
rocksdb_readoptions_t * ropt; STableObj * pTableObj = NULL;
STableObj * pTableObj = NULL; STkvWriteOpts *wopt;
rocksdb_writeoptions_t *wopt;
// Check if table already exists // Check if table already exists
ropt = rocksdb_readoptions_create(); ropt = tkvReadOptsCreate();
char *uidStr = rocksdb_get(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen, &err); char *uidStr = tkvGet(pMeta->tbnameDb, ropt, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (uidStr != NULL) { if (uidStr != NULL) {
// Has duplicate named table // Has duplicate named table
return -1; return -1;
} }
rocksdb_readoptions_destroy(ropt); tkvReadOptsDestroy(ropt);
// Create table obj // Create table obj
pTableObj = metaTableObjNew(); pTableObj = metaTableObjNew();
...@@ -144,12 +141,12 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { ...@@ -144,12 +141,12 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj)); taosHashPut(pMeta->pTableObjHash, &(pTableObj->pTable->uid), sizeof(tb_uid_t), &pTableObj, sizeof(pTableObj));
wopt = rocksdb_writeoptions_create(); wopt = tkvWriteOptsCreate();
rocksdb_writeoptions_disable_WAL(wopt, 1); // rocksdb_writeoptions_disable_WAL(wopt, 1);
// Add to tbname db // Add to tbname db
rocksdb_put(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), &pTableObj->pTable->uid, tkvPut(pMeta->tbnameDb, wopt, pTableOpts->name, strlen(pTableOpts->name), (char *)&pTableObj->pTable->uid,
sizeof(tb_uid_t), &err); sizeof(tb_uid_t));
// Add to schema db // Add to schema db
char id[12]; char id[12];
...@@ -159,9 +156,9 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) { ...@@ -159,9 +156,9 @@ int metaCreateTable(SMeta *pMeta, STableOpts *pTableOpts) {
*(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema); *(int32_t *)(id + sizeof(tb_uid_t)) = schemaVersion(pTableOpts->pSchema);
int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema); int size = tdEncodeSchema(&pBuf, pTableOpts->pSchema);
rocksdb_put(pMeta->schemaDb, wopt, id, 12, buf, size, &err); tkvPut(pMeta->schemaDb, wopt, id, 12, buf, size);
rocksdb_writeoptions_destroy(wopt); tkvWriteOptsDestroy(wopt);
pthread_rwlock_unlock(&pMeta->rwLock); pthread_rwlock_unlock(&pMeta->rwLock);
......
...@@ -14,8 +14,8 @@ target_link_libraries(metaTest ...@@ -14,8 +14,8 @@ target_link_libraries(metaTest
os os
util util
common common
rocksdb
gtest_main gtest_main
tkv
) )
enable_testing() enable_testing()
add_test( add_test(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册