diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 76c779fe2cdcace6b0fc49c5b1b143a51fd042bf..0565859688a0ed21f6041772913ec079e2a4d2b8 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -18,7 +18,9 @@ #include "db.h" struct SMetaDB { - DB * pDB; + DB * pStbDB; + DB * pCtbDB; + DB * pNtbDB; DB * pIdx; DB_ENV *pEvn; }; @@ -32,22 +34,32 @@ int metaOpenDB(SMeta *pMeta) { return -1; } - // TODO: create the env + // create the env ret = db_env_create(&(pMeta->pDB->pEvn), 0); if (ret != 0) { // TODO: handle error return -1; } - // pMeta->pDB->pEvn->set_cachesize(pMeta->pDB->pEvn, ) - ret = pMeta->pDB->pEvn->open(pMeta->pDB->pEvn, pMeta->path, DB_CREATE | DB_INIT_MPOOL, 0); if (ret != 0) { // TODO: handle error return -1; } - ret = db_create(&(pMeta->pDB->pDB), pMeta->pDB->pEvn, 0); + ret = db_create(&(pMeta->pDB->pStbDB), pMeta->pDB->pEvn, 0); + if (ret != 0) { + // TODO: handle error + return -1; + } + + ret = db_create(&(pMeta->pDB->pCtbDB), pMeta->pDB->pEvn, 0); + if (ret != 0) { + // TODO: handle error + return -1; + } + + ret = db_create(&(pMeta->pDB->pNtbDB), pMeta->pDB->pEvn, 0); if (ret != 0) { // TODO: handle error return -1; @@ -59,13 +71,37 @@ int metaOpenDB(SMeta *pMeta) { return -1; } - ret = pMeta->pDB->pDB->open(pMeta->pDB->pDB, /* DB structure pointer */ - NULL, /* Transaction pointer */ - "meta.db", /* On-disk file that holds the database */ - NULL, /* Optional logical database name */ - DB_BTREE, /* Database access method */ - DB_CREATE, /* Open flags */ - 0); /* File mode */ + ret = pMeta->pDB->pStbDB->open(pMeta->pDB->pStbDB, /* DB structure pointer */ + NULL, /* Transaction pointer */ + "meta.db", /* On-disk file that holds the database */ + NULL, /* Optional logical database name */ + DB_BTREE, /* Database access method */ + DB_CREATE, /* Open flags */ + 0); /* File mode */ + if (ret != 0) { + // TODO: handle error + return -1; + } + + ret = pMeta->pDB->pCtbDB->open(pMeta->pDB->pCtbDB, /* DB structure pointer */ + NULL, /* Transaction pointer */ + "meta.db", /* On-disk file that holds the database */ + NULL, /* Optional logical database name */ + DB_BTREE, /* Database access method */ + DB_CREATE, /* Open flags */ + 0); /* File mode */ + if (ret != 0) { + // TODO: handle error + return -1; + } + + ret = pMeta->pDB->pNtbDB->open(pMeta->pDB->pNtbDB, /* DB structure pointer */ + NULL, /* Transaction pointer */ + "meta.db", /* On-disk file that holds the database */ + NULL, /* Optional logical database name */ + DB_BTREE, /* Database access method */ + DB_CREATE, /* Open flags */ + 0); /* File mode */ if (ret != 0) { // TODO: handle error return -1; @@ -94,9 +130,19 @@ void metaCloseDB(SMeta *pMeta) { pMeta->pDB->pIdx = NULL; } - if (pMeta->pDB->pDB) { - pMeta->pDB->pDB->close(pMeta->pDB->pDB, 0); - pMeta->pDB->pDB = NULL; + if (pMeta->pDB->pNtbDB) { + pMeta->pDB->pNtbDB->close(pMeta->pDB->pNtbDB, 0); + pMeta->pDB->pNtbDB = NULL; + } + + if (pMeta->pDB->pCtbDB) { + pMeta->pDB->pCtbDB->close(pMeta->pDB->pCtbDB, 0); + pMeta->pDB->pCtbDB = NULL; + } + + if (pMeta->pDB->pStbDB) { + pMeta->pDB->pStbDB->close(pMeta->pDB->pStbDB, 0); + pMeta->pDB->pStbDB = NULL; } if (pMeta->pDB->pEvn) { @@ -109,7 +155,28 @@ void metaCloseDB(SMeta *pMeta) { } int metaSaveTableToDB(SMeta *pMeta, const STbCfg *pTbCfg) { - // TODO + tb_uid_t uid; + DBT key = {0}; + DBT value = {0}; + char buf[256]; + void * pBuf; + int bsize; + + if (pTbCfg->type == META_SUPER_TABLE) { + uid = pTbCfg->stbCfg.suid; + } else { + uid = metaGenerateUid(pMeta); + } + + key.size = sizeof(uid); + key.data = &uid; + + pBuf = buf; + value.size = metaEncodeTbCfg(&pBuf, pTbCfg); + value.data = buf; + + pMeta->pDB->pStbDB->put(pMeta->pDB->pStbDB, NULL, &key, &value, 0); + return 0; }