提交 44daa77d 编写于 作者: H Hongze Cheng

more TDB

上级 3a173f46
......@@ -187,7 +187,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
}
// Insert the cell to the index
ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell);
ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell, 0);
if (ret < 0) {
return -1;
}
......@@ -490,7 +490,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
SCell *pDivCell[2] = {0};
int szDivCell[2];
int sIdx;
u8 childLeaf;
u8 childNotLeaf;
{ // Find 3 child pages at most to do balance
int nCells = TDB_PAGE_TOTAL_CELLS(pParent);
......@@ -529,14 +529,21 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
}
// copy the parent key out if child pages are not leaf page
childLeaf = TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]));
if (!childLeaf) {
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]));
if (childNotLeaf) {
for (int i = 0; i < nOlds - 1; i++) {
pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
pDivCell[i] = malloc(szDivCell[i]);
memcpy(pDivCell, pCell, szDivCell[i]);
((SPgno *)pDivCell)[0] = ((SIntHdr *)pOlds[i]->pData)->pgno;
((SIntHdr *)pOlds[i]->pData)->pgno = 0;
// here we insert the cell as an overflow cell to avoid
// the slow defragment process
tdbPageInsertCell(pOlds[i], TDB_PAGE_TOTAL_CELLS(pOlds[i]), pDivCell[i], szDivCell[i], 1);
}
}
// drop the cells on parent page
......@@ -575,7 +582,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
// page is full, use a new page
nNews++;
// for an internal leaf case, this cell is used as the new divider cell to parent
if (!childLeaf) continue;
if (childNotLeaf) continue;
}
infoNews[nNews].cnt++;
infoNews[nNews].size += cellBytes;
......@@ -584,7 +591,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
// For internal pages
if (!childLeaf && oPage < nOlds - 1) {
if (childNotLeaf && oPage < nOlds - 1) {
if (infoNews[nNews].size + szDivCell[oPage] + TDB_PAGE_OFFSET_SIZE(pPage) > TDB_PAGE_USABLE_SIZE(pPage)) {
nNews++;
}
......@@ -607,7 +614,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
pPage = pOlds[infoNews[iNew - 1].oPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
if (childLeaf) { // child leaf
if (!childNotLeaf) { // child leaf
for (;;) {
pCell = tdbPageGetCell(pPage, infoNews[iNew - 1].oIdx);
cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell);
......@@ -729,7 +736,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
if (!childLeaf) {
if (childNotLeaf) {
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
}
......@@ -746,7 +753,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
tIdx++;
break;
} else {
if (!childLeaf) {
if (childNotLeaf) {
if (iOld < nOlds - 1) {
pCell = pDivCell[iOld];
szCell = szDivCell[iOld];
......@@ -780,18 +787,18 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
}
tdbPageInsertCell(pNews[iNew], iCell, pCell, szCell);
tdbPageInsertCell(pNews[iNew], iCell, pCell, szCell, 0);
}
// fill right-most child pgno if internal page
if (!childLeaf) {
if (childNotLeaf) {
if (tIdx < nCells) {
pCell = tdbPageGetCell(pTPage[tPage], tIdx);
szCell = tdbBtreeCellSize(pTPage[tPage], pCell);
tIdx++;
break;
} else {
if (!childLeaf) {
if (!childNotLeaf) {
if (iOld < nOlds - 1) {
pCell = pDivCell[iOld];
szCell = szDivCell[iOld];
......@@ -836,7 +843,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&TDB_PAGE_PGNO(pNews[iNew]), sizeof(SPgno), pCell,
&szCell);
// TODO: the cell here may be used by pParent as an overflow cell
tdbPageInsertCell(pParent, sIdx++, pCell, szCell);
tdbPageInsertCell(pParent, sIdx++, pCell, szCell, 0);
}
}
......
......@@ -108,7 +108,7 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx);
void tdbPageCopy(SPage *pFromPage, SPage *pToPage);
......
......@@ -114,7 +114,7 @@ void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
ASSERT(pPage->pFreeEnd - pPage->pFreeStart <= TDB_PAGE_NFREE(pPage));
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl) {
int nFree;
int nCells;
int iOvfl;
......@@ -135,7 +135,19 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
lidx = idx - iOvfl;
if (nFree >= szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
if (asOvfl || nFree < szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
// TODO: make it extensible
// add the cell as an overflow cell
for (int i = pPage->nOverflow; i > iOvfl; i--) {
pPage->apOvfl[i] = pPage->apOvfl[i - 1];
pPage->aiOvfl[i] = pPage->aiOvfl[i - 1];
}
pPage->apOvfl[iOvfl] = pCell;
pPage->aiOvfl[iOvfl] = idx;
pPage->nOverflow++;
iOvfl++;
} else {
// page must has enough space to hold the cell locally
tdbPageAllocate(pPage, szCell, &pNewCell);
......@@ -149,18 +161,6 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
TDB_PAGE_NCELLS_SET(pPage, nCells + 1);
ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1));
} else {
// TODO: make it extensible
// add the cell as an overflow cell
for (int i = pPage->nOverflow; i > iOvfl; i--) {
pPage->apOvfl[i] = pPage->apOvfl[i - 1];
pPage->aiOvfl[i] = pPage->aiOvfl[i - 1];
}
pPage->apOvfl[iOvfl] = pCell;
pPage->aiOvfl[iOvfl] = idx;
pPage->nOverflow++;
iOvfl++;
}
for (; iOvfl < pPage->nOverflow; iOvfl++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册