提交 f9e699be 编写于 作者: H Hongze Cheng

more TDB

上级 9dea8624
......@@ -73,12 +73,12 @@ int tdbEnvClose(TENV *pEnv) {
return 0;
}
int tdbBegin(TENV *pEnv) {
int tdbBegin(TENV *pEnv, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager);
ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
......@@ -88,12 +88,12 @@ int tdbBegin(TENV *pEnv) {
return 0;
}
int tdbCommit(TENV *pEnv) {
int tdbCommit(TENV *pEnv, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager);
ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
......@@ -103,7 +103,7 @@ int tdbCommit(TENV *pEnv) {
return 0;
}
int tdbRollback(TENV *pEnv) {
int tdbRollback(TENV *pEnv, TXN *pTxn) {
ASSERT(0);
return 0;
}
......
......@@ -36,7 +36,7 @@ struct SPCache {
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static int tdbPCacheOpenImpl(SPCache *pCache);
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid);
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage);
......@@ -78,12 +78,12 @@ int tdbPCacheClose(SPCache *pCache) {
return 0;
}
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid) {
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
SPage *pPage;
tdbPCacheLock(pCache);
pPage = tdbPCacheFetchImpl(pCache, pPgid);
pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
if (pPage) {
TDB_REF_PAGE(pPage);
}
......@@ -106,7 +106,8 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage) {
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; }
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid) {
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
int ret;
SPage *pPage;
// 1. Search the hash table
......@@ -117,9 +118,11 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid) {
}
if (pPage) {
// TODO: the page need to be copied and
// replaced the page in hash table
tdbPCachePinPage(pCache, pPage);
return pPage;
}
return pPage;
// 2. Try to allocate a new page from the free list
if (pCache->pFree) {
......@@ -136,7 +139,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid) {
tdbPCachePinPage(pCache, pPage);
}
// 4. Try a stress allocation (TODO)
// 4. Try a create new page
if (pTxn && pTxn->xMalloc) {
ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) {
// TODO
ASSERT(0);
return NULL;
}
// init the page fields
pPage->isAnchor = 0;
pPage->isLocal = 0;
TDB_INIT_PAGE_REF(pPage);
}
// 5. Page here are just created from a free list
// or by recycling or allocated streesly,
......@@ -145,6 +161,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid) {
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL;
pPage->pPager = NULL;
// TODO: allocated page may not add to hash
tdbPCacheAddPageToHash(pCache, pPage);
}
......@@ -171,17 +189,21 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
nRef = TDB_GET_PAGE_REF(pPage);
ASSERT(nRef >= 0);
if (nRef == 0) {
// Add the page to LRU list
ASSERT(pPage->pLruNext == NULL);
pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage;
if (1) {
// Add the page to LRU list
ASSERT(pPage->pLruNext == NULL);
pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage;
pCache->nRecyclable++;
} else {
// TODO: may need to free the page
}
}
pCache->nRecyclable++;
tdbPCacheUnlock(pCache);
}
......@@ -229,13 +251,14 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// pPage->pgid = 0;
pPage->isAnchor = 0;
pPage->isLocalPage = 1;
pPage->isLocal = 1;
TDB_INIT_PAGE_REF(pPage);
pPage->pHashNext = NULL;
pPage->pLruNext = NULL;
pPage->pLruPrev = NULL;
pPage->pDirtyNext = NULL;
// add page to free list
pPage->pFreeNext = pCache->pFree;
pCache->pFree = pPage;
pCache->nFree++;
......
......@@ -157,7 +157,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
return 0;
}
int tdbPagerBegin(SPager *pPager) {
int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
if (pPager->inTran) {
return 0;
}
......@@ -175,7 +175,7 @@ int tdbPagerBegin(SPager *pPager) {
return 0;
}
int tdbPagerCommit(SPager *pPager) {
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
SPage *pPage;
int ret;
......@@ -227,7 +227,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid);
pPage = tdbPCacheFetch(pPager->pCache, &pgid, NULL);
if (pPage == NULL) {
return -1;
}
......@@ -263,7 +263,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = *ppgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid);
pPage = tdbPCacheFetch(pPager->pCache, &pgid, NULL);
if (pPage == NULL) {
return -1;
}
......
......@@ -33,9 +33,9 @@ typedef struct STEnv {
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv);
int tdbEnvClose(TENV *pEnv);
int tdbBegin(TENV *pEnv);
int tdbCommit(TENV *pEnv);
int tdbRollback(TENV *pEnv);
int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv, TXN *pTxn);
int tdbRollback(TENV *pEnv, TXN *pTxn);
void tdbEnvAddPager(TENV *pEnv, SPager *pPager);
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager);
......
......@@ -111,6 +111,13 @@ typedef struct SPager SPager;
typedef struct SPCache SPCache;
typedef struct SPage SPage;
typedef struct STxn {
u64 txnId;
void *(*xMalloc)(void *, size_t);
void (*xFree)(void *, void *);
void *xArg;
} TXN;
#include "tdbOs.h"
#include "tdbUtil.h"
......
......@@ -22,7 +22,7 @@ extern "C" {
#define TDB_PCACHE_PAGE \
u8 isAnchor; \
u8 isLocalPage; \
u8 isLocal; \
u8 isDirty; \
i32 nRef; \
SPage *pCacheNext; \
......@@ -48,7 +48,7 @@ extern "C" {
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage);
int tdbPCacheGetPageSize(SPCache *pCache);
......
......@@ -40,8 +40,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager);
int tdbPagerCommit(SPager *pPager);
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage);
......
......@@ -20,15 +20,6 @@
extern "C" {
#endif
typedef struct STxn TXN;
struct STxn {
u64 txnId;
void *(*xMalloc)(void *, int);
void (*xFree)(void *, void *);
void *xArg;
};
#ifdef __cplusplus
}
#endif
......
......@@ -135,7 +135,7 @@ TEST(tdb_test, simple_test) {
{ // Insert some data
for (int i = 1; i <= nData;) {
tdbBegin(pEnv);
tdbBegin(pEnv, NULL);
for (int k = 0; k < 2000; k++) {
sprintf(key, "key%d", i);
......@@ -145,12 +145,10 @@ TEST(tdb_test, simple_test) {
i++;
}
tdbCommit(pEnv);
tdbCommit(pEnv, NULL);
}
}
tdbCommit(pEnv);
{ // Query the data
void *pVal = NULL;
int vLen;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册