提交 6b47fd89 编写于 作者: H Hongze Cheng

TDB with txn

上级 f9e699be
...@@ -127,7 +127,7 @@ int tdbBtreeClose(SBTree *pBt) { ...@@ -127,7 +127,7 @@ int tdbBtreeClose(SBTree *pBt) {
return 0; return 0;
} }
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen) { int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
SBTC btc; SBTC btc;
SCell *pCell; SCell *pCell;
void *pBuf; void *pBuf;
...@@ -137,7 +137,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in ...@@ -137,7 +137,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
int idx; int idx;
int c; int c;
tdbBtcOpen(&btc, pBt); tdbBtcOpen(&btc, pBt, pTxn);
// move to the position to insert // move to the position to insert
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c); ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
...@@ -225,7 +225,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL ...@@ -225,7 +225,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
void *pTVal = NULL; void *pTVal = NULL;
SCellDecoder cd; SCellDecoder cd;
tdbBtcOpen(&btc, pBt); tdbBtcOpen(&btc, pBt, NULL);
ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret); ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (ret < 0) { if (ret < 0) {
...@@ -307,13 +307,13 @@ static int tdbBtreeOpenImpl(SBTree *pBt) { ...@@ -307,13 +307,13 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to create a new database // Try to create a new database
SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt}; SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt};
ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg); ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg, NULL);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// TODO: here still has problem // TODO: here still has problem
tdbPagerReturnPage(pBt->pPager, pPage); tdbPagerReturnPage(pBt->pPager, pPage, NULL);
ASSERT(pgno != 0); ASSERT(pgno != 0);
pBt->root = pgno; pBt->root = pgno;
...@@ -385,7 +385,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) { ...@@ -385,7 +385,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
} }
// TDB_BTREE_BALANCE ===================== // TDB_BTREE_BALANCE =====================
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN *pTxn) {
SPager *pPager; SPager *pPager;
SPage *pChild; SPage *pChild;
SPgno pgnoChild; SPgno pgnoChild;
...@@ -402,7 +402,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { ...@@ -402,7 +402,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
// Allocate a new child page // Allocate a new child page
zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT); zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
zArg.pBt = pBt; zArg.pBt = pBt;
ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg); ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg, pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -436,7 +436,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { ...@@ -436,7 +436,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
return 0; return 0;
} }
static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTxn) {
int ret; int ret;
int nOlds; int nOlds;
...@@ -477,7 +477,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -477,7 +477,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
pgno = *(SPgno *)pCell; pgno = *(SPgno *)pCell;
} }
ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -640,7 +640,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -640,7 +640,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
} else { } else {
iarg.pBt = pBt; iarg.pBt = pBt;
iarg.flags = flags; iarg.flags = flags;
ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg); ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -767,9 +767,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -767,9 +767,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
// TODO: here is not corrent for drop case // TODO: here is not corrent for drop case
for (int i = 0; i < nNews; i++) { for (int i = 0; i < nNews; i++) {
if (i < nOlds) { if (i < nOlds) {
tdbPagerReturnPage(pBt->pPager, pOlds[i]); tdbPagerReturnPage(pBt->pPager, pOlds[i], pTxn);
} else { } else {
tdbPagerReturnPage(pBt->pPager, pNews[i]); tdbPagerReturnPage(pBt->pPager, pNews[i], pTxn);
} }
} }
...@@ -805,7 +805,7 @@ static int tdbBtreeBalance(SBTC *pBtc) { ...@@ -805,7 +805,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// ignore the case of empty // ignore the case of empty
if (pPage->nOverflow == 0) break; if (pPage->nOverflow == 0) break;
ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1])); ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1]), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -819,12 +819,12 @@ static int tdbBtreeBalance(SBTC *pBtc) { ...@@ -819,12 +819,12 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// Generalized balance step // Generalized balance step
pParent = pBtc->pgStack[iPage - 1]; pParent = pBtc->pgStack[iPage - 1];
ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1]); ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1], pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
...@@ -1024,11 +1024,12 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) { ...@@ -1024,11 +1024,12 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
// TDB_BTREE_CELL // TDB_BTREE_CELL
// TDB_BTREE_CURSOR ===================== // TDB_BTREE_CURSOR =====================
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt) { int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
pBtc->pBt = pBt; pBtc->pBt = pBt;
pBtc->iPage = -1; pBtc->iPage = -1;
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
pBtc->pTxn = pTxn;
return 0; return 0;
} }
...@@ -1045,7 +1046,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) { ...@@ -1045,7 +1046,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -1110,7 +1111,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) { ...@@ -1110,7 +1111,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -1284,7 +1285,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) { ...@@ -1284,7 +1285,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt); ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -1296,7 +1297,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) { ...@@ -1296,7 +1297,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
static int tdbBtcMoveUpward(SBTC *pBtc) { static int tdbBtcMoveUpward(SBTC *pBtc) {
if (pBtc->iPage == 0) return -1; if (pBtc->iPage == 0) return -1;
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
...@@ -1319,7 +1320,7 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -1319,7 +1320,7 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
ASSERT(0); ASSERT(0);
...@@ -1456,7 +1457,7 @@ int tdbBtcClose(SBTC *pBtc) { ...@@ -1456,7 +1457,7 @@ int tdbBtcClose(SBTC *pBtc) {
for (;;) { for (;;) {
ASSERT(pBtc->pPage); ASSERT(pBtc->pPage);
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
pBtc->iPage--; pBtc->iPage--;
if (pBtc->iPage < 0) break; if (pBtc->iPage < 0) break;
......
...@@ -75,8 +75,8 @@ int tdbDbDrop(TDB *pDb) { ...@@ -75,8 +75,8 @@ int tdbDbDrop(TDB *pDb) {
return 0; return 0;
} }
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) { int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen); return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
} }
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
...@@ -97,7 +97,7 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) { ...@@ -97,7 +97,7 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
return -1; return -1;
} }
tdbBtcOpen(&pDbc->btc, pDb->pBt); tdbBtcOpen(&pDbc->btc, pDb->pBt, NULL);
// TODO: move to first now, we can move to any key-value // TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs. // and in any direction, design new APIs.
......
...@@ -93,14 +93,33 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { ...@@ -93,14 +93,33 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
return pPage; return pPage;
} }
void tdbPCacheRelease(SPCache *pCache, SPage *pPage) { void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
i32 nRef; i32 nRef;
nRef = TDB_UNREF_PAGE(pPage); nRef = TDB_UNREF_PAGE(pPage);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
tdbPCacheUnpinPage(pCache, pPage); tdbPCacheLock(pCache);
// test the nRef again to make sure
// it is safe th handle the page
nRef = TDB_GET_PAGE_REF(pPage);
if (nRef == 0) {
if (pPage->isLocal) {
tdbPCacheUnpinPage(pCache, pPage);
} else {
// remove from hash
tdbPCacheRemovePageFromHash(pCache, pPage);
// free the page
if (pTxn && pTxn->xFree) {
tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg);
}
}
}
tdbPCacheUnlock(pCache);
} }
} }
...@@ -140,7 +159,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) ...@@ -140,7 +159,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
} }
// 4. Try a create new page // 4. Try a create new page
if (pTxn && pTxn->xMalloc) { if (!pPage && pTxn && pTxn->xMalloc) {
ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg); ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
...@@ -182,29 +201,17 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) { ...@@ -182,29 +201,17 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) { static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
i32 nRef; i32 nRef;
tdbPCacheLock(pCache);
ASSERT(!pPage->isDirty); ASSERT(!pPage->isDirty);
ASSERT(TDB_GET_PAGE_REF(pPage) == 0);
nRef = TDB_GET_PAGE_REF(pPage); ASSERT(pPage->pLruNext == NULL);
ASSERT(nRef >= 0);
if (nRef == 0) {
if (1) {
// Add the page to LRU list
ASSERT(pPage->pLruNext == NULL);
pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage;
pCache->nRecyclable++;
} else {
// TODO: may need to free the page
}
}
tdbPCacheUnlock(pCache); pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage;
pCache->nRecyclable++;
} }
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
......
...@@ -27,7 +27,6 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct") ...@@ -27,7 +27,6 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage); static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage);
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
...@@ -204,7 +203,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -204,7 +203,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
tdbPCacheRelease(pPager->pCache, pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
// sync the db file // sync the db file
...@@ -219,7 +218,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -219,7 +218,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0; return 0;
} }
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) { int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn) {
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
int ret; int ret;
...@@ -227,7 +227,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage ...@@ -227,7 +227,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache // Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno; pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, NULL); pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) { if (pPage == NULL) {
return -1; return -1;
} }
...@@ -247,7 +247,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage ...@@ -247,7 +247,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
return 0; return 0;
} }
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) { int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn) {
int ret; int ret;
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
...@@ -255,6 +256,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage ...@@ -255,6 +256,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Allocate a page number // Allocate a page number
ret = tdbPagerAllocPage(pPager, ppgno); ret = tdbPagerAllocPage(pPager, ppgno);
if (ret < 0) { if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
...@@ -263,8 +265,9 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage ...@@ -263,8 +265,9 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache // Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = *ppgno; pgid.pgno = *ppgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, NULL); pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) { if (pPage == NULL) {
ASSERT(0);
return -1; return -1;
} }
...@@ -273,6 +276,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage ...@@ -273,6 +276,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Initialize the page if need // Initialize the page if need
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0); ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
if (ret < 0) { if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
...@@ -283,7 +287,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage ...@@ -283,7 +287,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
return 0; return 0;
} }
void tdbPagerReturnPage(SPager *pPager, SPage *pPage) { tdbPCacheRelease(pPager->pCache, pPage); } void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); }
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) { static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
// TODO: Allocate a page from the free list // TODO: Allocate a page from the free list
...@@ -295,7 +299,7 @@ static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) { ...@@ -295,7 +299,7 @@ static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
return 0; return 0;
} }
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) { int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
int ret; int ret;
*ppgno = 0; *ppgno = 0;
......
...@@ -15,29 +15,17 @@ ...@@ -15,29 +15,17 @@
#include "tdbInt.h" #include "tdbInt.h"
// int tdbTxnBegin(TENV *pEnv) { int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
// // TODO int flags) {
// return 0; // not support read-committed version at the moment
// } ASSERT(flags == 0 || flags == TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// int tdbTxnCommit(TENV *pEnv) { pTxn->flags = flags;
// SPager *pPager = NULL; pTxn->txnId = txnid;
// int ret; pTxn->xMalloc = xMalloc;
pTxn->xFree = xFree;
pTxn->xArg = xArg;
return 0;
}
// for (;;) { int tdbTxnClose(TXN *pTxn) { return 0; }
// break; \ No newline at end of file
// ret = tdbPagerCommit(pPager);
// if (ret < 0) {
// ASSERT(0);
// return -1;
// }
// }
// // TODO
// return 0;
// }
// int tdbTxnRollback(TENV *pEnv) {
// // TODO
// return 0;
// }
\ No newline at end of file
...@@ -35,17 +35,18 @@ struct SBTC { ...@@ -35,17 +35,18 @@ struct SBTC {
int idx; int idx;
int idxStack[BTREE_MAX_DEPTH + 1]; int idxStack[BTREE_MAX_DEPTH + 1];
SPage *pgStack[BTREE_MAX_DEPTH + 1]; SPage *pgStack[BTREE_MAX_DEPTH + 1];
TXN *pTxn;
}; };
// SBTree // SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt); int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// SBTC // SBTC
int tdbBtcOpen(SBTC *pCur, SBTree *pBt); int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn);
int tdbBtcMoveToFirst(SBTC *pBtc); int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc); int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
......
...@@ -27,7 +27,7 @@ typedef struct STDBC TDBC; ...@@ -27,7 +27,7 @@ typedef struct STDBC TDBC;
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, TENV *pEnv, TDB **ppDb); int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb); int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb); int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen); int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
......
...@@ -111,13 +111,21 @@ typedef struct SPager SPager; ...@@ -111,13 +111,21 @@ typedef struct SPager SPager;
typedef struct SPCache SPCache; typedef struct SPCache SPCache;
typedef struct SPage SPage; typedef struct SPage SPage;
// transaction
#define TDB_TXN_WRITE 0x1
#define TDB_TXN_READ_UNCOMMITTED 0x2
typedef struct STxn { typedef struct STxn {
u64 txnId; int flags;
i64 txnId;
void *(*xMalloc)(void *, size_t); void *(*xMalloc)(void *, size_t);
void (*xFree)(void *, void *); void (*xFree)(void *, void *);
void *xArg; void *xArg;
} TXN; } TXN;
#define TDB_TXN_IS_WRITE(PTXN) ((PTXN)->flags & TDB_TXN_WRITE)
#define TDB_TXN_IS_READ(PTXN) (!TDB_TXN_IS_WRITE(PTXN))
#define TDB_TXN_IS_READ_UNCOMMITTED(PTXN) ((PTXN)->flags & TDB_TXN_READ_UNCOMMITTED)
#include "tdbOs.h" #include "tdbOs.h"
#include "tdbUtil.h" #include "tdbUtil.h"
......
...@@ -49,7 +49,7 @@ extern "C" { ...@@ -49,7 +49,7 @@ extern "C" {
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache); int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache); int tdbPCacheClose(SPCache *pCache);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn); SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage); void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
int tdbPCacheGetPageSize(SPCache *pCache); int tdbPCacheGetPageSize(SPCache *pCache);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -42,9 +42,12 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate); ...@@ -42,9 +42,12 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage); int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
extern "C" { extern "C" {
#endif #endif
int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags);
int tdbTxnClose(TXN *pTxn);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,7 +19,7 @@ static SPoolMem *openPool() { ...@@ -19,7 +19,7 @@ static SPoolMem *openPool() {
return pPool; return pPool;
} }
static void closePool(SPoolMem *pPool) { static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem; SPoolMem *pMem;
do { do {
...@@ -35,13 +35,14 @@ static void closePool(SPoolMem *pPool) { ...@@ -35,13 +35,14 @@ static void closePool(SPoolMem *pPool) {
} while (1); } while (1);
assert(pPool->size == 0); assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
tdbOsFree(pPool); tdbOsFree(pPool);
} }
#define clearPool closePool static void *poolMalloc(void *arg, size_t size) {
static void *poolMalloc(void *arg, int size) {
void *ptr = NULL; void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg; SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem; SPoolMem *pMem;
...@@ -118,7 +119,8 @@ TEST(tdb_test, simple_test) { ...@@ -118,7 +119,8 @@ TEST(tdb_test, simple_test) {
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
FKeyComparator compFunc; FKeyComparator compFunc;
int nData = 50000000; int nData = 10000000;
TXN txn;
// Open Env // Open Env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv); ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
...@@ -130,25 +132,44 @@ TEST(tdb_test, simple_test) { ...@@ -130,25 +132,44 @@ TEST(tdb_test, simple_test) {
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
{ {
char key[64]; char key[64];
char val[64]; char val[64];
int64_t poolLimit = 4096; // 1M pool limit
{ // Insert some data int64_t txnid = 0;
for (int i = 1; i <= nData;) { SPoolMem *pPool;
tdbBegin(pEnv, NULL);
// open the pool
for (int k = 0; k < 2000; k++) { pPool = openPool();
sprintf(key, "key%d", i);
sprintf(val, "value%d", i); // start a transaction
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val)); txnid++;
GTEST_ASSERT_EQ(ret, 0); tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
i++; tdbBegin(pEnv, &txn);
}
for (int iData = 1; iData <= nData; iData++) {
tdbCommit(pEnv, NULL); sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
} }
} }
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
{ // Query the data { // Query the data
void *pVal = NULL; void *pVal = NULL;
int vLen; int vLen;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册