提交 c448c156 编写于 作者: H Hongze Cheng

more work

上级 9048c77e
......@@ -17,8 +17,14 @@
#include "metaDef.h"
#include "tcoding.h"
#include "thash.h"
typedef struct {
tb_uid_t uid;
int32_t sver;
} SSchemaKey;
struct SMetaDB {
// DB
DB * pStbDB;
......@@ -39,9 +45,18 @@ static SMetaDB *metaNewDB();
static void metaFreeDB(SMetaDB *pDB);
static int metaCreateDBEnv(SMetaDB *pDB, const char *path);
static void metaDestroyDBEnv(SMetaDB *pDB);
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey);
static void * metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey);
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg);
#define META_OPEN_DB(pDB, pEnv, fName) \
do { \
int ret; \
ret = db_create(&((pDB)), (pEnv), 0); \
if (ret != 0) { \
P_ERROR("Failed to create META DB", ret); \
......@@ -59,6 +74,15 @@ static void metaDestroyDBEnv(SMetaDB *pDB);
#define META_CLOSE_DB(pDB)
#define META_ASSOCIATE_IDX(pDB, pNameIdx, cbf) \
do { \
int ret = (pDB)->associate((pDB), NULL, (pNameIdx), (cbf), 0); \
if (ret != 0) { \
P_ERROR("Failed to associate META DB", ret); \
metaCloseDB(pMeta); \
} \
} while (0)
int metaOpenDB(SMeta *pMeta) {
int ret;
SMetaDB *pDB;
......@@ -89,6 +113,17 @@ int metaOpenDB(SMeta *pMeta) {
META_OPEN_DB(pDB->pUidIdx, pDB->pEvn, "index.db");
// Associate name index
META_ASSOCIATE_IDX(pDB->pStbDB, pDB->pNameIdx, metaNameIdxCb);
META_ASSOCIATE_IDX(pDB->pStbDB, pDB->pUidIdx, metaUidIdxCb);
META_ASSOCIATE_IDX(pDB->pNtbDB, pDB->pNameIdx, metaNameIdxCb);
META_ASSOCIATE_IDX(pDB->pNtbDB, pDB->pUidIdx, metaUidIdxCb);
for (;;) {
// Loop to associate each super table db
// TODO: Loop to open index DB for each super table
// and create the association between main DB and index
......@@ -106,7 +141,56 @@ void metaCloseDB(SMeta *pMeta) {
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
char buf[512];
void * pBuf;
DBT key = {0};
DBT value = {0};
SSchemaKey schemaKey;
tb_uid_t uid;
if (pTbCfg->type == META_SUPER_TABLE) {
// Handle SUPER table
uid = pTbCfg->stbCfg.suid;
// Same table info
metaSaveTbInfo(pMeta->pDB->pStbDB, uid, pTbCfg);
// save schema
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
// Create a super table DB and corresponding index DB
DB *pStbDB;
DB *pStbIdxDB;
META_OPEN_DB(pStbDB, pMeta->pDB->pEvn, "meta.db");
META_OPEN_DB(pStbIdxDB, pMeta->pDB->pEvn, "index.db");
} else if (pTbCfg->type == META_CHILD_TABLE) {
// Handle CHILD table
uid = metaGenerateUid(pMeta);
DB *pCTbDB = taosHashGet(pMeta->pDB->pCtbMap, &(pTbCfg->ctbCfg.suid), sizeof(pTbCfg->ctbCfg.suid));
if (pCTbDB == NULL) {
metaSaveTbInfo(pCTbDB, uid, pTbCfg);
} else if (pTbCfg->type == META_NORMAL_TABLE) {
// Handle NORMAL table
uid = metaGenerateUid(pMeta);
metaSaveTbInfo(pMeta->pDB->pNtbDB, uid, pTbCfg);
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
} else {
return 0;
......@@ -175,4 +259,79 @@ static void metaDestroyDBEnv(SMetaDB *pDB) {
if (pDB->pEvn) {
pDB->pEvn->close(pDB->pEvn, 0);
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey) {
int tsize = 0;
tsize += taosEncodeFixedU64(buf, pSchemaKey->uid);
tsize += taosEncodeFixedI32(buf, pSchemaKey->sver);
return tsize;
static void *metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey) {
buf = taosDecodeFixedU64(buf, &(pSchemaKey->uid));
buf = taosDecodeFixedI32(buf, &(pSchemaKey->sver));
return buf;
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
return 0;
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
return 0;
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
SSchemaKey skey;
char buf[256];
void * pBuf = buf;
DBT key = {0};
DBT value = {0};
skey.uid = uid;
skey.sver = schemaVersion(pSchema);
key.data = &skey;
key.size = sizeof(skey);
tdEncodeSchema(&pBuf, pSchema);
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0);
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
return 0;
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
return NULL;
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg) {
DBT key = {0};
DBT value = {0};
char buf[512];
void *pBuf = buf;
key.data = &uid;
key.size = sizeof(uid);
metaEncodeTbInfo(&pBuf, pTbCfg);
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
pDB->put(pDB, NULL, &key, &value, 0);
return 0;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册