未验证 提交 a579331c 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #15002 from taosdata/fix/tdb-multidb

fix/tdb: first round multi-db implementation
...@@ -67,7 +67,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * ...@@ -67,7 +67,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
static int tdbBtcMoveDownward(SBTC *pBtc); static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SBTree **ppBt) { int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
SBTree **ppBt) {
SBTree *pBt; SBTree *pBt;
int ret; int ret;
...@@ -99,13 +100,54 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB ...@@ -99,13 +100,54 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
// pBt->minLeaf // pBt->minLeaf
pBt->minLeaf = pBt->minLocal; pBt->minLeaf = pBt->minLocal;
// if pgno == 0 fetch new btree root leaf page
if (pgno == 0) {
// fetch page & insert into main db
// allocate a new child page
SPage *pPage;
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
pPager->inTran = 1;
SBtreeInitPageArg zArg;
zArg.flags = 0x1 | 0x2; // root leaf node;
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
if (ret < 0) {
return -1;
}
// TODO: Need to zero the page
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
return -1;
}
if (strcmp(TDB_MAINDB_NAME, tbname)) {
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
if (ret < 0) {
return -1;
}
}
// tdbUnrefPage(pPage);
tdbPCacheRelease(pPager->pCache, pPage, &txn);
tdbCommit(pPager->pEnv, &txn);
tdbTxnClose(&txn);
}
ASSERT(pgno != 0);
pBt->root = pgno;
/*
// TODO: pBt->root // TODO: pBt->root
ret = tdbBtreeOpenImpl(pBt); ret = tdbBtreeOpenImpl(pBt);
if (ret < 0) { if (ret < 0) {
tdbOsFree(pBt); tdbOsFree(pBt);
return -1; return -1;
} }
*/
*ppBt = pBt; *ppBt = pBt;
return 0; return 0;
} }
...@@ -338,7 +380,6 @@ static int tdbBtreeOpenImpl(SBTree *pBt) { ...@@ -338,7 +380,6 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
ASSERT(pgno != 0); ASSERT(pgno != 0);
pBt->root = pgno; pBt->root = pgno;
return 0; return 0;
} }
......
...@@ -64,6 +64,14 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { ...@@ -64,6 +64,14 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
mkdir(dbname, 0755); mkdir(dbname, 0755);
#ifdef USE_MAINDB
// open main db
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SPgno), NULL, pDb, &pDb->pMainDb);
if (ret < 0) {
return -1;
}
#endif
*ppDb = pDb; *ppDb = pDb;
return 0; return 0;
} }
...@@ -72,6 +80,10 @@ int tdbClose(TDB *pDb) { ...@@ -72,6 +80,10 @@ int tdbClose(TDB *pDb) {
SPager *pPager; SPager *pPager;
if (pDb) { if (pDb) {
#ifdef USE_MAINDB
if (pDb->pMainDb) tdbTbClose(pDb->pMainDb);
#endif
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) { for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
pDb->pgrList = pPager->pNext; pDb->pgrList = pPager->pNext;
tdbPagerClose(pPager); tdbPagerClose(pPager);
...@@ -179,4 +191,4 @@ void tdbEnvRemovePager(TDB *pDb, SPager *pPager) { ...@@ -179,4 +191,4 @@ void tdbEnvRemovePager(TDB *pDb, SPager *pPager) {
if (pDb->nPgrHash > 8 && pDb->nPager < pDb->nPgrHash / 2) { if (pDb->nPgrHash > 8 && pDb->nPager < pDb->nPgrHash / 2) {
// TODO // TODO
} }
} }
\ No newline at end of file
...@@ -174,6 +174,13 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -174,6 +174,13 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage); for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
ppPage = &((*ppPage)->pDirtyNext)) { ppPage = &((*ppPage)->pDirtyNext)) {
} }
if (*ppPage && TDB_PAGE_PGNO(*ppPage) == TDB_PAGE_PGNO(pPage)) {
tdbUnrefPage(pPage);
return 0;
}
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage)); ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
pPage->pDirtyNext = *ppPage; pPage->pDirtyNext = *ppPage;
*ppPage = pPage; *ppPage = pPage;
...@@ -467,7 +474,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -467,7 +474,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
} }
TXN txn; TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
SBtreeInitPageArg iArg; SBtreeInitPageArg iArg;
iArg.pBt = pBt; iArg.pBt = pBt;
iArg.flags = 0; iArg.flags = 0;
...@@ -487,6 +494,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -487,6 +494,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return -1; return -1;
} }
/*
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &iArg, &txn); ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &iArg, &txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
...@@ -499,6 +507,18 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -499,6 +507,18 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
} }
tdbPCacheRelease(pPager->pCache, pPage, &txn); tdbPCacheRelease(pPager->pCache, pPage, &txn);
*/
i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0);
return -1;
}
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) {
ASSERT(0);
return -1;
}
} }
tdbOsFSync(pPager->fd); tdbOsFSync(pPager->fd);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "tdbInt.h" #include "tdbInt.h"
struct STTB { struct STTB {
TDB * pEnv; TDB *pEnv;
SBTree *pBt; SBTree *pBt;
}; };
...@@ -25,12 +25,16 @@ struct STBC { ...@@ -25,12 +25,16 @@ struct STBC {
}; };
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) { int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
TTB * pTb; TTB *pTb;
SPager *pPager; SPager *pPager;
int ret; int ret;
char fFullName[TDB_FILENAME_LEN]; char fFullName[TDB_FILENAME_LEN];
SPage * pPage; SPage *pPage;
SPgno pgno; SPgno pgno;
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
int nData = 0;
*ppTb = NULL; *ppTb = NULL;
...@@ -42,6 +46,48 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF ...@@ -42,6 +46,48 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
// pTb->pEnv // pTb->pEnv
pTb->pEnv = pEnv; pTb->pEnv = pEnv;
#ifdef USE_MAINDB
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, TDB_MAINDB_NAME);
if (strcmp(TDB_MAINDB_NAME, tbname)) {
pPager = tdbEnvGetPager(pEnv, fFullName);
if (!pPager) {
return -1;
}
ret = tdbTbGet(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pData, &nData);
if (ret < 0) {
// new pgno & insert into main db
pgno = 0;
} else {
pgno = *(SPgno *)pData;
tdbFree(pKey);
tdbFree(pData);
}
} else {
pPager = tdbEnvGetPager(pEnv, fFullName);
if (pPager == NULL) {
ret = tdbPagerOpen(pEnv->pCache, fFullName, &pPager);
if (ret < 0) {
return -1;
}
tdbEnvAddPager(pEnv, pPager);
pPager->pEnv = pEnv;
}
if (pPager->dbOrigSize > 0) {
pgno = 1;
} else {
pgno = 0;
}
}
#else
pPager = tdbEnvGetPager(pEnv, tbname); pPager = tdbEnvGetPager(pEnv, tbname);
if (pPager == NULL) { if (pPager == NULL) {
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, tbname); snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, tbname);
...@@ -53,10 +99,12 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF ...@@ -53,10 +99,12 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
tdbEnvAddPager(pEnv, pPager); tdbEnvAddPager(pEnv, pPager);
} }
#endif
ASSERT(pPager != NULL); ASSERT(pPager != NULL);
// pTb->pBt // pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, keyCmprFn, &(pTb->pBt)); ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt));
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
......
...@@ -150,7 +150,8 @@ struct SBTC { ...@@ -150,7 +150,8 @@ struct SBTC {
}; };
// SBTree // SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBTree **ppBt); int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn); int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn);
...@@ -356,6 +357,12 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) { ...@@ -356,6 +357,12 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
return pCell; return pCell;
} }
#define USE_MAINDB
#ifdef USE_MAINDB
#define TDB_MAINDB_NAME "main.tdb"
#endif
struct STDB { struct STDB {
char *dbName; char *dbName;
char *jnName; char *jnName;
...@@ -365,6 +372,9 @@ struct STDB { ...@@ -365,6 +372,9 @@ struct STDB {
int nPager; int nPager;
int nPgrHash; int nPgrHash;
SPager **pgrHash; SPager **pgrHash;
#ifdef USE_MAINDB
TTB *pMainDb;
#endif
}; };
struct SPager { struct SPager {
...@@ -381,6 +391,9 @@ struct SPager { ...@@ -381,6 +391,9 @@ struct SPager {
u8 inTran; u8 inTran;
SPager *pNext; // used by TDB SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB
TDB *pEnv;
#endif
}; };
#ifdef __cplusplus #ifdef __cplusplus
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册