提交 13b7ba3e 编写于 作者: H Hongze Cheng

refact TDB

上级 7a5985ae
......@@ -145,7 +145,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
}
if (pCur->idx == -1) {
ASSERT(TDB_PAGE_NCELLS(pCur->pPage) == 0);
ASSERT(TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0);
idx = 0;
} else {
if (cret > 0) {
......@@ -218,7 +218,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
pCur->iPage = 0;
if (TDB_PAGE_NCELLS(pCur->pPage) == 0) {
if (TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0) {
// Current page is empty
ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pCur->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0;
......@@ -231,7 +231,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
SCellDecoder cd = {0};
pPage = pCur->pPage;
nCells = TDB_PAGE_NCELLS(pPage);
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0;
ridx = nCells - 1;
......@@ -242,7 +242,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
midx = (lidx + ridx) >> 1;
pCell = TDB_PAGE_CELL_AT(pPage, midx);
pCell = tdbPageGetCell(pPage, midx);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
......@@ -283,7 +283,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
} else {
// TODO: reset cd as uninitialized
pCur->idx = midx + 1;
pCell = TDB_PAGE_CELL_AT(pPage, midx + 1);
pCell = tdbPageGetCell(pPage, midx + 1);
tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtCursorMoveToChild(pCur, cd.pgno);
}
......@@ -391,16 +391,12 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) {
} else {
pPage->szAmHdr = sizeof(SBtPageHdr);
}
pPage->pPageHdr = pPage->pData;
pPage->pAmHdr = pPage->pPageHdr + pPage->pPageMethods->szPageHdr;
pPage->pCellIdx = pPage->pAmHdr + pPage->szAmHdr;
pPage->pFreeStart = pPage->pCellIdx + pPage->pPageMethods->szOffset * TDB_PAGE_NCELLS(pPage);
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
pPage->xCellSize = NULL; // TODO
tdbPageInit(pPage);
TDB_BTREE_ASSERT_FLAG(flags);
// Init other fields
if (isLeaf) {
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
......@@ -413,30 +409,41 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) {
pPage->minLocal = pBt->minLocal;
}
// TODO: need to update the SPage.nFree
// pPage->nFree = pPage->pFreeEnd - pPage->pFreeStart;
pPage->nOverflow = 0;
return 0;
}
static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
u16 flags;
SBTree *pBt;
u8 isLeaf;
flags = ((SBtreeZeroPageArg *)arg)->flags;
pBt = ((SBtreeZeroPageArg *)arg)->pBt;
pPage->pPageHdr = pPage->pData;
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
// Init the page header
TDB_PAGE_FLAGS_SET(pPage, flags);
TDB_PAGE_NCELLS_SET(pPage, 0);
TDB_PAGE_CCELLS_SET(pPage, pBt->pageSize - sizeof(SPageFtr));
TDB_PAGE_FCELL_SET(pPage, 0);
TDB_PAGE_NFREE_SET(pPage, 0);
// Set szAmHdr
if (isLeaf) {
pPage->szAmHdr = 0;
} else {
pPage->szAmHdr = sizeof(SBtPageHdr);
}
pPage->xCellSize = NULL; // TODO
tdbPageZero(pPage);
tdbBtreeInitPage(pPage, (void *)pBt);
if (isLeaf) {
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf;
} else {
pPage->kLen = pBt->keyLen;
pPage->vLen = sizeof(SPgno);
pPage->maxLocal = pBt->maxLocal;
pPage->minLocal = pBt->minLocal;
}
return 0;
}
......@@ -453,7 +460,8 @@ typedef struct {
} SBtreeBalanceHelper;
static int tdbBtreeCopyPageContent(SPage *pFrom, SPage *pTo) {
int nCells = TDB_PAGE_NCELLS(pFrom);
#if 0
int nCells = TDB_PAGE_TOTAL_CELLS(pFrom);
int cCells = TDB_PAGE_CCELLS(pFrom);
int fCell = TDB_PAGE_FCELL(pFrom);
int nFree = TDB_PAGE_NFREE(pFrom);
......@@ -469,6 +477,7 @@ static int tdbBtreeCopyPageContent(SPage *pFrom, SPage *pTo) {
TDB_PAGE_NFREE_SET(pTo, nFree);
// TODO: update other fields
#endif
return 0;
}
......@@ -529,7 +538,7 @@ static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) {
SBTree *pBt;
pParent = pBlh->pParent;
nCells = TDB_PAGE_NCELLS(pParent);
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
nChild = nCells + 1;
pBt = pBlh->pBt;
......@@ -556,7 +565,7 @@ static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) {
if (idxStart + i == nCells) {
pgno = ((SBtPageHdr *)(pParent->pAmHdr))[0].rChild;
} else {
pCell = TDB_PAGE_CELL_AT(pParent, idxStart + i);
pCell = tdbPageGetCell(pParent, idxStart + i);
// TODO: no need to decode the payload part, and even the kLen, vLen part
// we only need the pgno part
ret = tdbBtreeDecodeCell(pParent, pCell, &cd);
......@@ -610,7 +619,7 @@ static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) {
if (cidx < limit) {
// Get local cells
pCell = TDB_PAGE_CELL_AT(pPage, cidx);
pCell = tdbPageGetCell(pPage, cidx);
} else if (cidx == limit) {
// Get overflow cells
pCell = pPage->apOvfl[oidx++];
......
......@@ -57,8 +57,8 @@ typedef struct __attribute__((__packed__)) {
struct SPage {
pthread_spinlock_t lock;
u8 *pData;
int pageSize;
u8 *pData;
SPageMethods *pPageMethods;
// Fields below used by pager and am
u8 szAmHdr;
......@@ -81,21 +81,8 @@ struct SPage {
};
/* For page */
#define TDB_PAGE_FLAGS(pPage) (*(pPage)->pPageMethods->getFlags)(pPage)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage) (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage) (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_FLAGS_SET(pPage, FLAGS) (*(pPage)->pPageMethods->setFlags)(pPage, FLAGS)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL) (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE) (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
#define TDB_PAGE_FLAGS(pPage) (*(pPage)->pPageMethods->getFlags)(pPage)
#define TDB_PAGE_FLAGS_SET(pPage, FLAGS) (*(pPage)->pPageMethods->setFlags)(pPage, FLAGS)
// For page lock
#define P_LOCK_SUCC 0
......@@ -120,6 +107,8 @@ struct SPage {
})
// APIs
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage);
......@@ -127,6 +116,29 @@ void tdbPageInit(SPage *pPage);
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageDropCell(SPage *pPage, int idx);
static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
SCell *pCell;
int iOvfl;
int lidx;
ASSERT(idx >= 0 && idx < pPage->nOverflow + pPage->pPageMethods->getCellNum(pPage));
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
if (pPage->aiOvfl[iOvfl] == idx) {
pCell = pPage->apOvfl[iOvfl];
return pCell;
} else if (pPage->aiOvfl[iOvfl] > idx) {
break;
}
}
lidx = idx - iOvfl;
pCell = pPage->pData + pPage->pPageMethods->getCellOffset(pPage, lidx);
return pCell;
}
#ifdef __cplusplus
}
#endif
......
......@@ -18,9 +18,20 @@
extern SPageMethods pageMethods;
extern SPageMethods pageLargeMethods;
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_HDR_SIZE(pPage) ((pPage)->pPageMethods->szPageHdr)
#define TDB_PAGE_FREE_CELL_SIZE(pPage) ((pPage)->pPageMethods->szFreeCell)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_HDR_SIZE(pPage) ((pPage)->pPageMethods->szPageHdr)
#define TDB_PAGE_FREE_CELL_SIZE(pPage) ((pPage)->pPageMethods->szFreeCell)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage) (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage) (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL) (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE) (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
#define TDB_PAGE_MAX_FREE_BLOCK(pPage) \
((pPage)->pageSize - (pPage)->szAmHdr - TDB_PAGE_HDR_SIZE(pPage) - sizeof(SPageFtr))
......@@ -47,8 +58,8 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
pPage = (SPage *)(ptr + pageSize);
TDB_INIT_PAGE_LOCK(pPage);
pPage->pData = ptr;
pPage->pageSize = pageSize;
pPage->pData = ptr;
if (pageSize < 65536) {
pPage->pPageMethods = &pageMethods;
} else {
......@@ -125,6 +136,7 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1));
} else {
// TODO: make it extensible
// add the cell as an overflow cell
for (int i = pPage->nOverflow; i > iOvfl; i--) {
pPage->apOvfl[i] = pPage->apOvfl[i - 1];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册