提交 a5002a73 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -16,13 +16,13 @@ option(
option(
BUILD_WITH_ROCKSDB
"If build with rocksdb"
ON
OFF
)
option(
BUILD_WITH_SQLITE
"If build with sqlite"
ON
OFF
)
option(
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "db.h"
// refer: https://docs.oracle.com/cd/E17076_05/html/gsg/C/BerkeleyDB-Core-C-GSG.pdf
// Access Methods:
// 1. BTree
// 2. Hash
// 3. Queue
// 4. Recno
// Use secondary database work as index.
// Any attemp to write to a secondary database results in a non-zero status return.
static int idx_callback(DB *dbp, const DBT *keyp, const DBT *valuep, DBT *resvp);
#define USE_ENV 0
#define DESCRIPTION_SIZE 128
float money = 122.45;
char *description = "Grocery bill.";
typedef struct {
int id;
char *family_name;
char *surname;
} SPersion;
static void put_value(DB *dbp) {
DBT key = {0};
DBT value = {0};
int ret;
key.data = &money;
key.size = sizeof(money);
value.data = description;
value.size = strlen(description) + 1;
ret = dbp->put(dbp, NULL, &key, &value, DB_NOOVERWRITE);
if (ret != 0) {
fprintf(stderr, "Failed to put DB record: %s\n", db_strerror(ret));
}
}
static void get_value(DB *dbp) {
char desp[DESCRIPTION_SIZE];
DBT key = {0};
DBT value = {0};
key.data = &money;
key.size = sizeof(money);
value.data = desp;
value.ulen = DESCRIPTION_SIZE;
value.flags = DB_DBT_USERMEM;
dbp->get(dbp, NULL, &key, &value, 0);
printf("The value is \"%s\"\n", desp);
}
int main(int argc, char const *argv[]) {
DB * db;
int ret;
uint32_t flags;
DB * dbp = NULL;
DB * sdbp = NULL;
u_int32_t db_flags;
DB_ENV * envp = NULL;
u_int32_t env_flags;
int ret;
DBT key = {0};
DBT value = {0};
ret = db_create(&db, NULL, 0);
#if USE_ENV
// Initialize an env object and open it for
ret = db_env_create(&envp, 0);
if (ret != 0) {
fprintf(stderr, "Error creating env handle: %s\n", db_strerror(ret));
return -1;
}
env_flags = DB_CREATE | DB_INIT_MPOOL;
ret = envp->open(envp, "./meta", env_flags, 0);
if (ret != 0) {
fprintf(stderr, "Error opening env handle: %s\n", db_strerror(ret));
return -1;
}
#endif
// Initialize a DB handle and open the DB
ret = db_create(&dbp, envp, 0);
if (ret != 0) {
exit(1);
}
flags = DB_CREATE;
ret = db_create(&sdbp, envp, 0);
if (ret != 0) {
exit(1);
}
ret = db->open(db, NULL, "test.db", NULL, DB_BTREE, flags, 0);
ret = sdbp->set_flags(sdbp, DB_DUPSORT);
if (ret != 0) {
exit(1);
}
db->close(db, 0);
db_flags = DB_CREATE | DB_TRUNCATE;
ret = dbp->open(dbp, /* DB structure pointer */
NULL, /* Transaction pointer */
"meta.db", /* On-disk file that holds the database */
NULL, /* Optional logical database name */
DB_BTREE, /* Database access method */
db_flags, /* Open flags */
0); /* File mode */
if (ret != 0) {
exit(1);
}
ret = sdbp->open(sdbp, /* DB structure pointer */
NULL, /* Transaction pointer */
"index.db", /* On-disk file that holds the database */
NULL, /* Optional logical database name */
DB_BTREE, /* Database access method */
db_flags, /* Open flags */
0); /* File mode */
if (ret != 0) {
exit(1);
}
// Associate the secondary database to the primary
dbp->associate(dbp, /* Primary database */
NULL, /* TXN id */
sdbp, /* Secondary database */
idx_callback, /* Callback used for key creation */
0); /* Flags */
{
// Insert a key-value record
put_value(dbp);
// Read the key-value record
get_value(dbp);
}
// Close the database
if (sdbp != NULL) {
sdbp->close(sdbp, 0);
}
if (dbp != NULL) {
dbp->close(dbp, 0);
}
if (envp != NULL) {
envp->close(envp, 0);
}
return 0;
}
static int idx_callback(DB * sdbp, /* secondary db handle */
const DBT *keyp, /* primary db record's key */
const DBT *valuep, /* primary db record's value */
DBT * skeyp /* secondary db record's key*/
) {
DBT *tmpdbt;
tmpdbt = (DBT *)calloc(2, sizeof(DBT));
{ // TODO
tmpdbt[0].data = NULL;
tmpdbt[0].size = 0;
tmpdbt[1].data = NULL;
tmpdbt[1].size = 0;
}
/**
* DB_DBT_MULTIPLE means DBT references an array
* DB_DBT_APPMALLOC means we dynamically allocated memory for the DBT's data field.
*/
skeyp->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
skeyp->size = 2;
skeyp->data = tmpdbt;
return 0;
}
\ No newline at end of file
......@@ -105,7 +105,7 @@ void metaOptionsClear(SMetaCfg *pOptions);
#define META_CLEAR_TB_CFG(pTbCfg)
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg);
void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg);
void *metaDecodeTbCfg(void *pBuf, STbCfg *pTbCfg);
#ifdef __cplusplus
}
......
......@@ -19,5 +19,5 @@ target_link_libraries(
# test
if(${BUILD_TEST})
# add_subdirectory(test)
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
set(META_DB_IMPL_LIST "BDB" "SQLITE")
set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation")
set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST})
if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST)
message(STATUS "META DB Impl: ${META_DB_IMPL}==============")
else()
message(FATAL_ERROR "Invalid META DB IMPL: ${META_DB_IMPL}==============")
endif()
aux_source_directory(src META_SRC)
if(${META_DB_IMPL} STREQUAL "BDB")
list(REMOVE_ITEM META_SRC "src/metaSQLiteImpl.c")
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
list(REMOVE_ITEM META_SRC "src/metaBDBImpl.c")
endif()
add_library(meta STATIC ${META_SRC})
target_include_directories(
meta
......@@ -7,11 +23,22 @@ target_include_directories(
)
target_link_libraries(
meta
PUBLIC sqlite
PUBLIC common
PUBLIC tkv
)
if(${META_DB_IMPL} STREQUAL "BDB")
target_link_libraries(
meta
PUBLIC bdb
)
elseif(${META_DB_IMPL} STREQUAL "SQLITE")
target_link_libraries(
meta
PUBLIC sqlite
)
endif()
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -16,15 +16,13 @@
#ifndef _TD_META_CACHE_H_
#define _TD_META_CACHE_H_
#include "rocksdb/c.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef rocksdb_cache_t meta_cache_t;
typedef struct SMetaCache SMetaCache;
int metaOpenCache(SMeta *pMeta);
void metaCloseCache(SMeta *pMeta);
......
......@@ -16,26 +16,17 @@
#ifndef _TD_META_DB_H_
#define _TD_META_DB_H_
#include "rocksdb/c.h"
#include "sqlite3.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
rocksdb_t *tbDb; // uid -> tb obj
rocksdb_t *nameDb; // name -> uid
rocksdb_t *tagDb; // uid -> tag
rocksdb_t *schemaDb; // uid+version -> schema
sqlite3 * mapDb; // suid -> uid_list
} meta_db_t;
typedef struct SMetaDB SMetaDB;
int metaOpenDB(SMeta *pMeta);
void metaCloseDB(SMeta *pMeta);
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions);
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg);
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid);
#ifdef __cplusplus
......
......@@ -34,9 +34,9 @@ extern "C" {
struct SMeta {
char* path;
SMetaCfg options;
meta_db_t* pDB;
meta_index_t* pIdx;
meta_cache_t* pCache;
SMetaDB* pDB;
SMetaIdx* pIdx;
SMetaCache* pCache;
STbUidGenerator uidGnrt;
SMemAllocatorFactory* pmaf;
};
......
......@@ -16,15 +16,13 @@
#ifndef _TD_META_IDX_H_
#define _TD_META_IDX_H_
#include "rocksdb/c.h"
#include "meta.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef rocksdb_t meta_index_t;
typedef struct SMetaIdx SMetaIdx;
int metaOpenIdx(SMeta *pMeta);
void metaCloseIdx(SMeta *pMeta);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "db.h"
#include "metaDef.h"
#include "tcoding.h"
#include "thash.h"
struct SMetaDB {
// DB
DB *pTbDB;
DB *pSchemaDB;
// IDX
DB *pNameIdx;
DB *pStbIdx;
DB *pNtbIdx;
DB *pCtbIdx;
// ENV
DB_ENV *pEvn;
};
typedef int (*bdbIdxCbPtr)(DB *, const DBT *, const DBT *, DBT *);
static SMetaDB *metaNewDB();
static void metaFreeDB(SMetaDB *pDB);
static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path);
static void metaCloseBDBEnv(DB_ENV *pEnv);
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName);
static void metaCloseBDBDb(DB *pDB);
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf);
static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
int metaOpenDB(SMeta *pMeta) {
SMetaDB *pDB;
// Create DB object
pDB = metaNewDB();
if (pDB == NULL) {
return -1;
}
pMeta->pDB = pDB;
// Open DB Env
if (metaOpenBDBEnv(&(pDB->pEvn), pMeta->path) < 0) {
metaCloseDB(pMeta);
return -1;
}
// Open DBs
if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db") < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db") < 0) {
metaCloseDB(pMeta);
return -1;
}
// Open Indices
if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNameIdxCb) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaStbIdxCb) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNtbIdxCb) < 0) {
metaCloseDB(pMeta);
return -1;
}
if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaCtbIdxCb) < 0) {
metaCloseDB(pMeta);
return -1;
}
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
metaCloseBDBEnv(pMeta->pDB->pEvn);
metaFreeDB(pMeta->pDB);
pMeta->pDB = NULL;
}
}
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
// TODO
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
// TODO
return 0;
}
/* ------------------------ STATIC METHODS ------------------------ */
static SMetaDB *metaNewDB() {
SMetaDB *pDB = NULL;
pDB = (SMetaDB *)calloc(1, sizeof(*pDB));
if (pDB == NULL) {
return NULL;
}
return pDB;
}
static void metaFreeDB(SMetaDB *pDB) {
if (pDB) {
free(pDB);
}
}
static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
int ret;
DB_ENV *pEnv;
if (path == NULL) return 0;
ret = db_env_create(&pEnv, 0);
if (ret != 0) {
BDB_PERR("Failed to create META env", ret);
return -1;
}
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_MPOOL, 0);
if (ret != 0) {
BDB_PERR("Failed to open META env", ret);
return -1;
}
*ppEnv = pEnv;
return 0;
}
static void metaCloseBDBEnv(DB_ENV *pEnv) {
if (pEnv) {
pEnv->close(pEnv, 0);
}
}
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) {
int ret;
DB *pDB;
ret = db_create(&((pDB)), (pEnv), 0);
if (ret != 0) {
BDB_PERR("Failed to create META DB", ret);
return -1;
}
ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
if (ret) {
BDB_PERR("Failed to open META DB", ret);
return -1;
}
*ppDB = pDB;
return 0;
}
static void metaCloseBDBDb(DB *pDB) {
if (pDB) {
pDB->close(pDB, 0);
}
}
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf) {
DB *pIdx;
int ret;
if (metaOpenBDBDb(ppIdx, pEnv, pFName) < 0) {
return -1;
}
pIdx = *ppIdx;
ret = pDB->associate(pDB, NULL, pIdx, cbf, 0);
if (ret) {
BDB_PERR("Failed to associate META DB and Index", ret);
}
return 0;
}
static void metaCloseBDBIdx(DB *pIdx) {
if (pIdx) {
pIdx->close(pIdx, 0);
}
}
static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
#if 0
typedef struct {
tb_uid_t uid;
int32_t sver;
} SSchemaKey;
static SMetaDB *metaNewDB();
static void metaFreeDB(SMetaDB *pDB);
static int metaCreateDBEnv(SMetaDB *pDB, const char *path);
static void metaDestroyDBEnv(SMetaDB *pDB);
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey);
static void * metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey);
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg);
#define META_ASSOCIATE_IDX(pDB, pIdx, cbf) \
do { \
int ret = (pDB)->associate((pDB), NULL, (pIdx), (cbf), 0); \
if (ret != 0) { \
P_ERROR("Failed to associate META DB", ret); \
metaCloseDB(pMeta); \
} \
} while (0)
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
char buf[512];
void * pBuf;
DBT key = {0};
DBT value = {0};
SSchemaKey schemaKey;
tb_uid_t uid;
if (pTbCfg->type == META_SUPER_TABLE) {
// Handle SUPER table
uid = pTbCfg->stbCfg.suid;
// Same table info
metaSaveTbInfo(pMeta->pDB->pStbDB, uid, pTbCfg);
// save schema
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
{
// Create a super table DB and corresponding index DB
DB *pStbDB;
DB *pStbIdxDB;
META_OPEN_DB(pStbDB, pMeta->pDB->pEvn, "meta.db");
META_OPEN_DB(pStbIdxDB, pMeta->pDB->pEvn, "index.db");
// TODO META_ASSOCIATE_IDX();
}
} else if (pTbCfg->type == META_CHILD_TABLE) {
// Handle CHILD table
uid = metaGenerateUid(pMeta);
DB *pCTbDB = taosHashGet(pMeta->pDB->pCtbMap, &(pTbCfg->ctbCfg.suid), sizeof(pTbCfg->ctbCfg.suid));
if (pCTbDB == NULL) {
ASSERT(0);
}
metaSaveTbInfo(pCTbDB, uid, pTbCfg);
} else if (pTbCfg->type == META_NORMAL_TABLE) {
// Handle NORMAL table
uid = metaGenerateUid(pMeta);
metaSaveTbInfo(pMeta->pDB->pNtbDB, uid, pTbCfg);
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
} else {
ASSERT(0);
}
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
// TODO
}
/* ------------------------ STATIC METHODS ------------------------ */
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey) {
int tsize = 0;
tsize += taosEncodeFixedU64(buf, pSchemaKey->uid);
tsize += taosEncodeFixedI32(buf, pSchemaKey->sver);
return tsize;
}
static void *metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey) {
buf = taosDecodeFixedU64(buf, &(pSchemaKey->uid));
buf = taosDecodeFixedI32(buf, &(pSchemaKey->sver));
return buf;
}
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
// TODO
return 0;
}
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
SSchemaKey skey;
char buf[256];
void * pBuf = buf;
DBT key = {0};
DBT value = {0};
skey.uid = uid;
skey.sver = schemaVersion(pSchema);
key.data = &skey;
key.size = sizeof(skey);
tdEncodeSchema(&pBuf, pSchema);
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0);
}
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
int tsize = 0;
tsize += taosEncodeString(buf, pTbCfg->name);
tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
if (pTbCfg->type == META_SUPER_TABLE) {
tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema);
} else if (pTbCfg->type == META_CHILD_TABLE) {
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
} else if (pTbCfg->type == META_NORMAL_TABLE) {
} else {
ASSERT(0);
}
return tsize;
}
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
// TODO
buf = taosDecodeString(buf, &(pTbCfg->name));
buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
if (pTbCfg->type == META_SUPER_TABLE) {
buf = tdDecodeSchema(buf, &(pTbCfg->stbCfg.pTagSchema));
} else if (pTbCfg->type == META_CHILD_TABLE) {
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
} else if (pTbCfg->type == META_NORMAL_TABLE) {
} else {
ASSERT(0);
}
return buf;
}
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg) {
DBT key = {0};
DBT value = {0};
char buf[512];
void *pBuf = buf;
key.data = &uid;
key.size = sizeof(uid);
metaEncodeTbInfo(&pBuf, pTbCfg);
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
pDB->put(pDB, NULL, &key, &value, 0);
return 0;
}
#endif
\ No newline at end of file
......@@ -16,22 +16,26 @@
#include "meta.h"
#include "metaDef.h"
struct SMetaCache {
// TODO
};
int metaOpenCache(SMeta *pMeta) {
// TODO
if (pMeta->options.lruSize) {
pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
if (pMeta->pCache == NULL) {
// TODO: handle error
return -1;
}
}
// if (pMeta->options.lruSize) {
// pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
// if (pMeta->pCache == NULL) {
// // TODO: handle error
// return -1;
// }
// }
return 0;
}
void metaCloseCache(SMeta *pMeta) {
if (pMeta->pCache) {
rocksdb_cache_destroy(pMeta->pCache);
pMeta->pCache = NULL;
}
// if (pMeta->pCache) {
// rocksdb_cache_destroy(pMeta->pCache);
// pMeta->pCache = NULL;
// }
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static void metaGetSchemaDBKey(char key[], tb_uid_t uid, int sversion);
// static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid);
#define SCHEMA_KEY_LEN (sizeof(tb_uid_t) + sizeof(int))
#define META_OPEN_DB_IMPL(pDB, options, dir, err) \
do { \
pDB = rocksdb_open(options, dir, &err); \
if (pDB == NULL) { \
metaCloseDB(pMeta); \
rocksdb_options_destroy(options); \
return -1; \
} \
} while (0)
int metaOpenDB(SMeta *pMeta) {
char dir[128];
char * err = NULL;
rocksdb_options_t *options = rocksdb_options_create();
if (pMeta->pCache) {
rocksdb_options_set_row_cache(options, pMeta->pCache);
}
rocksdb_options_set_create_if_missing(options, 1);
pMeta->pDB = (meta_db_t *)calloc(1, sizeof(*(pMeta->pDB)));
if (pMeta->pDB == NULL) {
// TODO: handle error
rocksdb_options_destroy(options);
return -1;
}
// tbDb
sprintf(dir, "%s/tb_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->tbDb, options, dir, err);
// nameDb
sprintf(dir, "%s/name_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->nameDb, options, dir, err);
// tagDb
sprintf(dir, "%s/tag_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->tagDb, options, dir, err);
// schemaDb
sprintf(dir, "%s/schema_db", pMeta->path);
META_OPEN_DB_IMPL(pMeta->pDB->schemaDb, options, dir, err);
// mapDb
sprintf(dir, "%s/meta.db", pMeta->path);
if (sqlite3_open(dir, &(pMeta->pDB->mapDb)) != SQLITE_OK) {
// TODO
}
// // set read uncommitted
sqlite3_exec(pMeta->pDB->mapDb, "PRAGMA read_uncommitted=true;", 0, 0, 0);
sqlite3_exec(pMeta->pDB->mapDb, "BEGIN;", 0, 0, 0);
rocksdb_options_destroy(options);
return 0;
}
#define META_CLOSE_DB_IMPL(pDB) \
do { \
if (pDB) { \
rocksdb_close(pDB); \
pDB = NULL; \
} \
} while (0)
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
if (pMeta->pDB->mapDb) {
sqlite3_exec(pMeta->pDB->mapDb, "COMMIT;", 0, 0, 0);
sqlite3_close(pMeta->pDB->mapDb);
pMeta->pDB->mapDb = NULL;
}
META_CLOSE_DB_IMPL(pMeta->pDB->schemaDb);
META_CLOSE_DB_IMPL(pMeta->pDB->tagDb);
META_CLOSE_DB_IMPL(pMeta->pDB->nameDb);
META_CLOSE_DB_IMPL(pMeta->pDB->tbDb);
free(pMeta->pDB);
pMeta->pDB = NULL;
}
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbOptions) {
tb_uid_t uid;
char * err = NULL;
size_t size;
char pBuf[1024]; // TODO
char sql[128];
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
// Generate a uid for child and normal table
if (pTbOptions->type == META_SUPER_TABLE) {
uid = pTbOptions->stbCfg.suid;
} else {
uid = metaGenerateUid(pMeta);
}
// Save tbname -> uid to tbnameDB
rocksdb_put(pMeta->pDB->nameDb, wopt, pTbOptions->name, strlen(pTbOptions->name), (char *)(&uid), sizeof(uid), &err);
rocksdb_writeoptions_disable_WAL(wopt, 1);
// Save uid -> tb_obj to tbDB
size = metaEncodeTbObjFromTbOptions(pTbOptions, pBuf, 1024);
rocksdb_put(pMeta->pDB->tbDb, wopt, (char *)(&uid), sizeof(uid), pBuf, size, &err);
switch (pTbOptions->type) {
case META_NORMAL_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbCfg.pSchema);
break;
case META_SUPER_TABLE:
// save schemaDB
metaSaveSchemaDB(pMeta, uid, pTbOptions->stbCfg.pSchema);
// // save mapDB (really need?)
// rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err);
sprintf(sql, "create table st_%" PRIu64 " (uid BIGINT);", uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
// fprintf(stderr, "Failed to create table, since %s\n", err);
}
break;
case META_CHILD_TABLE:
// save tagDB
rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbCfg.pTag,
kvRowLen(pTbOptions->ctbCfg.pTag), &err);
// save mapDB
sprintf(sql, "insert into st_%" PRIu64 " values (%" PRIu64 ");", pTbOptions->ctbCfg.suid, uid);
if (sqlite3_exec(pMeta->pDB->mapDb, sql, NULL, NULL, &err) != SQLITE_OK) {
fprintf(stderr, "failed to insert data, since %s\n", err);
}
break;
default:
ASSERT(0);
}
rocksdb_writeoptions_destroy(wopt);
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
/* TODO */
return 0;
}
/* ------------------------ STATIC METHODS ------------------------ */
static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
char key[64];
char pBuf[1024];
char * ppBuf = pBuf;
size_t vsize;
char * err = NULL;
rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(wopt, 1);
metaGetSchemaDBKey(key, uid, schemaVersion(pSchema));
vsize = tdEncodeSchema((void **)(&ppBuf), pSchema);
rocksdb_put(pMeta->pDB->schemaDb, wopt, key, SCHEMA_KEY_LEN, pBuf, vsize, &err);
rocksdb_writeoptions_destroy(wopt);
}
static void metaGetSchemaDBKey(char *key, tb_uid_t uid, int sversion) {
*(tb_uid_t *)key = uid;
*(int *)POINTER_SHIFT(key, sizeof(tb_uid_t)) = sversion;
}
// static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) {
// size_t vlen;
// char * val;
// char * err = NULL;
// rocksdb_readoptions_t *ropt = rocksdb_readoptions_create();
// val = rocksdb_get(pMeta->pDB->mapDb, ropt, (char *)(&suid), sizeof(suid), &vlen, &err);
// rocksdb_readoptions_destroy(ropt);
// void *nval = malloc(vlen + sizeof(uid));
// if (nval == NULL) {
// return -1;
// }
// if (vlen) {
// memcpy(nval, val, vlen);
// }
// memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid));
// rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
// rocksdb_writeoptions_disable_WAL(wopt, 1);
// rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err);
// rocksdb_writeoptions_destroy(wopt);
// free(nval);
// return 0;
// }
\ No newline at end of file
......@@ -15,7 +15,12 @@
#include "metaDef.h"
struct SMetaIdx {
/* data */
};
int metaOpenIdx(SMeta *pMeta) {
#if 0
char idxDir[128]; // TODO
char * err = NULL;
rocksdb_options_t *options = rocksdb_options_create();
......@@ -36,15 +41,18 @@ int metaOpenIdx(SMeta *pMeta) {
}
rocksdb_options_destroy(options);
#endif
return 0;
}
void metaCloseIdx(SMeta *pMeta) { /* TODO */
#if 0
if (pMeta->pIdx) {
rocksdb_close(pMeta->pIdx);
pMeta->pIdx = NULL;
}
#endif
}
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbOptions) {
......
......@@ -129,189 +129,4 @@ static void metaCloseImpl(SMeta *pMeta) {
metaCloseIdx(pMeta);
metaCloseDB(pMeta);
metaCloseCache(pMeta);
}
// OLD -------------------------------------------------------------------
#if 0
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts);
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts);
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts);
int metaCreateTable(SMeta *pMeta, const STableOptions *pTableOpts) {
size_t vallen;
char * pUid;
// Check if table already exists
pUid = tkvGet(pMeta->tbnameDb, NULL, pTableOpts->name, strlen(pTableOpts->name), &vallen);
if (pUid) {
free(pUid);
// Table already exists, return error code
return -1;
}
switch (pTableOpts->type) {
case META_SUPER_TABLE:
return metaCreateSuperTable(pMeta, pTableOpts->name, &(pTableOpts->superOpts));
case META_CHILD_TABLE:
return metaCreateChildTable(pMeta, pTableOpts->name, &(pTableOpts->childOpts));
case META_NORMAL_TABLE:
return metaCreateNormalTable(pMeta, pTableOpts->name, &(pTableOpts->normalOpts));
default:
ASSERT(0);
}
return 0;
}
static int metaCreateSuperTable(SMeta *pMeta, const char *tbname, const SSuperTableOpts *pSuperTableOpts) {
size_t vallen;
size_t keylen;
char * pVal = NULL;
char schemaKey[sizeof(tb_uid_t) * 2];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
pVal = tkvGet(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), &vallen);
if (pVal) {
free(pVal);
// TODO: table with same uid exists, just return error
return -1;
}
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname); // ENCODE TABLE NAME
vallen += taosEncodeFixedI32(&pBuf, 1); // ENCODE SCHEMA, SCHEMA HERE IS AN ARRAY OF VERSIONS
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pSuperTableOpts->pSchema));
vallen += tdEncodeSchema(&pBuf, pSuperTableOpts->pTagSchema); // ENCODE TAG SCHEMA
tkvPut(pMeta->tableDb, NULL, (char *)(&(pSuperTableOpts->uid)), sizeof(pSuperTableOpts->uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(pSuperTableOpts->uid)),
sizeof(pSuperTableOpts->uid));
// Put uid+sversion -> schema
*(tb_uid_t *)schemaKey = pSuperTableOpts->uid;
*(int32_t *)(POINTER_SHIFT(schemaKey, sizeof(tb_uid_t))) = schemaVersion(pSuperTableOpts->pSchema);
keylen = sizeof(tb_uid_t) + sizeof(int32_t);
pBuf = (void *)buffer;
vallen = tdEncodeSchema(&pBuf, pSuperTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, schemaKey, keylen, buffer, vallen);
return 0;
}
static int metaCreateChildTable(SMeta *pMeta, const char *tbname, const SChildTableOpts *pChildTableOpts) {
size_t vallen;
char buffer[1024]; /* TODO */
void * pBuf = NULL;
char * pTable;
tb_uid_t uid;
// Check if super table exists
pTable = tkvGet(pMeta->tableDb, NULL, (char *)(&(pChildTableOpts->suid)), sizeof(pChildTableOpts->suid), &vallen);
if (pTable == NULL) {
// Super table not exists, just return error
return -1;
}
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedU64(&pBuf, pChildTableOpts->suid);
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&uid), sizeof(uid));
// Put uid-> tags
tkvPut(pMeta->tagDb, NULL, (char *)(&uid), sizeof(uid), (char *)pChildTableOpts->tags,
(size_t)kvRowLen(pChildTableOpts->tags));
// TODO: Put tagIdx
return 0;
}
static int metaCreateNormalTable(SMeta *pMeta, const char *tbname, const SNormalTableOpts *pNormalTableOpts) {
size_t vallen;
char keyBuf[sizeof(tb_uid_t) + sizeof(int32_t)];
char buffer[1024]; /* TODO */
void * pBuf = NULL;
tb_uid_t uid;
// Generate a uid to the new table
uid = generateUid(&pMeta->uidGenerator);
// Put uid -> tbObj
vallen = 0;
pBuf = (void *)buffer;
vallen += taosEncodeString(&pBuf, tbname);
vallen += taosEncodeFixedI32(&pBuf, 1);
vallen += taosEncodeFixedI32(&pBuf, schemaVersion(pNormalTableOpts->pSchema));
tkvPut(pMeta->tableDb, NULL, (char *)(&uid), sizeof(uid), buffer, vallen);
// Put tbname -> uid
tkvPut(pMeta->tbnameDb, NULL, tbname, strlen(tbname), (char *)(&(uid)), sizeof(uid));
// Put uid+sversion -> schema
vallen = 0;
pBuf = (void *)buffer;
vallen += tdEncodeSchema(&pBuf, pNormalTableOpts->pSchema);
tkvPut(pMeta->schemaDb, NULL, keyBuf, sizeof(tb_uid_t) + sizeof(int32_t), buffer, vallen);
return 0;
}
void metaNormalTableOptsInit(STableOptions *pTableOpts, const char *name, const STSchema *pSchema) {
pTableOpts->type = META_NORMAL_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->normalOpts.pSchema = tdDupSchema(pSchema);
}
void metaSuperTableOptsInit(STableOptions *pTableOpts, const char *name, tb_uid_t uid, const STSchema *pSchema,
const STSchema *pTagSchema) {
pTableOpts->type = META_SUPER_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->superOpts.uid = uid;
pTableOpts->superOpts.pSchema = tdDupSchema(pSchema);
pTableOpts->superOpts.pTagSchema = tdDupSchema(pTagSchema);
}
void metaChildTableOptsInit(STableOptions *pTableOpts, const char *name, tb_uid_t suid, const SKVRow tags) {
pTableOpts->type = META_CHILD_TABLE;
pTableOpts->name = strdup(name);
pTableOpts->childOpts.suid = suid;
pTableOpts->childOpts.tags = tdKVRowDup(tags);
}
void metaTableOptsClear(STableOptions *pTableOpts) {
switch (pTableOpts->type) {
case META_NORMAL_TABLE:
tfree(pTableOpts->name);
tdFreeSchema(pTableOpts->normalOpts.pSchema);
break;
case META_SUPER_TABLE:
tdFreeSchema(pTableOpts->superOpts.pTagSchema);
tdFreeSchema(pTableOpts->superOpts.pSchema);
tfree(pTableOpts->name);
break;
case META_CHILD_TABLE:
kvRowFree(pTableOpts->childOpts.tags);
tfree(pTableOpts->name);
break;
default:
break;
}
memset(pTableOpts, 0, sizeof(*pTableOpts));
}
void metaDestroy(const char *path) { taosRemoveDir(path); }
#endif
\ No newline at end of file
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
#include "sqlite3.h"
struct SMetaDB {
sqlite3 *pDB;
};
int metaOpenDB(SMeta *pMeta) {
char dir[128];
int rc;
char *err = NULL;
pMeta->pDB = (SMetaDB *)calloc(1, sizeof(SMetaDB));
if (pMeta->pDB == NULL) {
// TODO: handle error
return -1;
}
sprintf(dir, "%s/meta.db", pMeta->path);
rc = sqlite3_open(dir, &(pMeta->pDB->pDB));
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to open meta.db\n");
}
// For all tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS tb ("
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_uid INTEGER NOT NULL UNIQUE "
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table tb since %s\n", err);
}
// For super tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS stb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL UNIQUE,"
" tb_schema BLOB NOT NULL,"
" tag_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table stb since %s\n", err);
}
// For normal tables
rc = sqlite3_exec(pMeta->pDB->pDB,
"CREATE TABLE IF NOT EXISTS ntb ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256) NOT NULL,"
" tb_schema BLOB NOT NULL"
");",
NULL, NULL, &err);
if (rc != SQLITE_OK) {
// TODO: handle error
printf("failed to create meta table ntb since %s\n", err);
}
sqlite3_exec(pMeta->pDB->pDB, "BEGIN;", NULL, NULL, &err);
tfree(err);
return 0;
}
void metaCloseDB(SMeta *pMeta) {
if (pMeta->pDB) {
sqlite3_exec(pMeta->pDB->pDB, "COMMIT;", NULL, NULL, NULL);
sqlite3_close(pMeta->pDB->pDB);
free(pMeta->pDB);
pMeta->pDB = NULL;
}
// TODO
}
int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbCfg) {
char sql[256];
char * err = NULL;
int rc;
tb_uid_t uid;
sqlite3_stmt *stmt;
char buf[256];
void * pBuf;
switch (pTbCfg->type) {
case META_SUPER_TABLE:
uid = pTbCfg->stbCfg.suid;
sprintf(sql,
"INSERT INTO tb VALUES (\'%s\', %" PRIu64
");"
"CREATE TABLE IF NOT EXISTS stb_%" PRIu64
" ("
" tb_uid INTEGER NOT NULL UNIQUE,"
" tbname VARCHAR(256),"
" tag1 INTEGER);",
pTbCfg->name, uid, uid);
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", err);
}
sprintf(sql, "INSERT INTO stb VALUES (%" PRIu64 ", %s, ?, ?)", uid, pTbCfg->name);
sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
sqlite3_bind_blob(stmt, 1, buf, POINTER_DISTANCE(pBuf, buf), NULL);
pBuf = buf;
tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
sqlite3_bind_blob(stmt, 2, buf, POINTER_DISTANCE(pBuf, buf), NULL);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
#if 0
sprintf(sql,
"INSERT INTO tb VALUES (?, ?);"
// "INSERT INTO stb VALUES (?, ?, ?, ?);"
// "CREATE TABLE IF NOT EXISTS stb_%" PRIu64
// " ("
// " tb_uid INTEGER NOT NULL UNIQUE,"
// " tbname VARCHAR(256),"
// " tag1 INTEGER);"
,
uid);
rc = sqlite3_prepare_v2(pMeta->pDB->pDB, sql, -1, &stmt, NULL);
if (rc != SQLITE_OK) {
return -1;
}
sqlite3_bind_text(stmt, 1, pTbCfg->name, -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(stmt, 2, uid);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
// sqlite3_bind_int64(stmt, 3, uid);
// sqlite3_bind_text(stmt, 4, pTbCfg->name, -1, SQLITE_TRANSIENT);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pSchema);
// sqlite3_bind_blob(stmt, 5, buf, POINTER_DISTANCE(pBuf, buf), NULL);
// pBuf = buf;
// tdEncodeSchema(&pBuf, pTbCfg->stbCfg.pTagSchema);
// sqlite3_bind_blob(stmt, 6, buf, POINTER_DISTANCE(pBuf, buf), NULL);
rc = sqliteVjj3_step(stmt);
if (rc != SQLITE_OK) {
printf("failed to create normal table since %s\n", sqlite3_errmsg(pMeta->pDB->pDB));
}
sqlite3_finalize(stmt);
#endif
break;
case META_NORMAL_TABLE:
// uid = metaGenerateUid(pMeta);
// sprintf(sql,
// "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO ntb VALUES (%" PRIu64 ", \'%s\', );",
// pTbCfg->name, uid, uid, pTbCfg->name, );
// rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
// if (rc != SQLITE_OK) {
// printf("failed to create normal table since %s\n", err);
// }
break;
case META_CHILD_TABLE:
#if 0
uid = metaGenerateUid(pMeta);
// sprintf(sql, "INSERT INTO tb VALUES (\'%s\', %" PRIu64
// ");"
// "INSERT INTO stb_%" PRIu64 " VALUES (%" PRIu64 ", \'%s\', );");
rc = sqlite3_exec(pMeta->pDB->pDB, sql, NULL, NULL, &err);
if (rc != SQLITE_OK) {
printf("failed to create child table since %s\n", err);
}
#endif
break;
default:
break;
}
tfree(err);
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
/* TODO */
return 0;
}
\ No newline at end of file
......@@ -49,10 +49,55 @@ size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t
}
int metaEncodeTbCfg(void **pBuf, STbCfg *pTbCfg) {
// TODO
return 0;
int tsize = 0;
tsize += taosEncodeString(pBuf, pTbCfg->name);
tsize += taosEncodeFixedU32(pBuf, pTbCfg->ttl);
tsize += taosEncodeFixedU32(pBuf, pTbCfg->keep);
tsize += taosEncodeFixedU8(pBuf, pTbCfg->type);
switch (pTbCfg->type) {
case META_SUPER_TABLE:
tsize += taosEncodeFixedU64(pBuf, pTbCfg->stbCfg.suid);
tsize += tdEncodeSchema(pBuf, pTbCfg->stbCfg.pSchema);
tsize += tdEncodeSchema(pBuf, pTbCfg->stbCfg.pTagSchema);
break;
case META_CHILD_TABLE:
tsize += taosEncodeFixedU64(pBuf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(pBuf, pTbCfg->ctbCfg.pTag);
break;
case META_NORMAL_TABLE:
tsize += tdEncodeSchema(pBuf, pTbCfg->ntbCfg.pSchema);
break;
default:
break;
}
return tsize;
}
void *metaDecodeTbCfg(void *pBuf, STbCfg **pTbCfg) {
// TODO
void *metaDecodeTbCfg(void *pBuf, STbCfg *pTbCfg) {
pBuf = taosDecodeString(pBuf, &(pTbCfg->name));
pBuf = taosDecodeFixedU32(pBuf, &(pTbCfg->ttl));
pBuf = taosDecodeFixedU32(pBuf, &(pTbCfg->keep));
pBuf = taosDecodeFixedU8(pBuf, &(pTbCfg->type));
switch (pTbCfg->type) {
case META_SUPER_TABLE:
pBuf = taosDecodeFixedU64(pBuf, &(pTbCfg->stbCfg.suid));
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->stbCfg.pSchema));
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->stbCfg.pTagSchema));
break;
case META_CHILD_TABLE:
pBuf = taosDecodeFixedU64(pBuf, &(pTbCfg->ctbCfg.suid));
pBuf = tdDecodeKVRow(pBuf, &(pTbCfg->ctbCfg.pTag));
break;
case META_NORMAL_TABLE:
pBuf = tdDecodeSchema(pBuf, &(pTbCfg->ntbCfg.pSchema));
break;
default:
break;
}
return pBuf;
}
\ No newline at end of file
......@@ -167,7 +167,10 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
break;
}
if (addPrefix != 0) {
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
if (i + 1 < ssz) {
FstBuilderNodeUnfinished *unf = taosArrayGet(node->stack, i + 1);
fstBuilderNodeUnfinishedAddOutputPrefix(unf, addPrefix);
}
}
}
return i;
......@@ -581,14 +584,13 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
- 1 // pack size
- node->nTrans;
uint64_t end = start + node->nTrans;
uint64_t len = end - start;
int32_t dlen = 0;
uint8_t *data = fstSliceData(slice, &dlen);
FstSlice t = fstSliceCopy(slice, start, end - 1);
int32_t len = 0;
uint8_t *data = fstSliceData(&t, &len);
for(int i = 0; i < len; i++) {
//uint8_t v = slice->data[slice->start + i];
////slice->data[slice->start + i];
uint8_t v = data[i];
if (v == b) {
return node->nTrans - i - 1; // bug
}
......@@ -1060,7 +1062,7 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
}
*out = tOut;
return false;
return true;
}
FstNode *fstGetRoot(Fst *fst) {
......
......@@ -83,7 +83,7 @@ int main(int argc, char** argv) {
std::string str("aaa");
str[2] = 'a' + i ;
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
Output val = 2;
Output val = 0;
fstBuilderInsert(b, key, val);
}
......@@ -108,13 +108,14 @@ int main(int argc, char** argv) {
Fst *fst = fstCreate(&s);
{
std::string str("aaa");
std::string str("aax");
uint64_t out;
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
bool ok = fstGet(fst, &key, &out);
if (ok == true) {
printf("val = %d\n", out);
//indexInfo("Get key-value success, %s, %d", str.c_str(), out);
} else {
//indexError("Get key-value failed, %s", str.c_str());
......
aux_source_directory(source TKV_SRC)
add_library(tkv ${TKV_SRC})
target_include_directories(
tkv
PUBLIC "${CMAKE_SOURCE_DIR}/include/tkv"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include"
)
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册