提交 3b49a863 编写于 作者: H Hongze Cheng

more TDB

上级 d7b54201
......@@ -564,7 +564,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
if (infoNews[nNews].size + cellBytes > TDB_PAGE_USABLE_SIZE(pPage)) {
// page is full, use a new page
nNews++;
// for a internal leaf case, this cell is used as the new divider cell to parent
// for an internal leaf case, this cell is used as the new divider cell to parent
if (!childLeaf) continue;
}
infoNews[nNews].cnt++;
......@@ -594,10 +594,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
int nCells;
int cellBytes;
if (childLeaf) { // child leaf
pPage = pOlds[infoNews[iNew - 1].oPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
pPage = pOlds[infoNews[iNew - 1].oPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
if (childLeaf) { // child leaf
for (;;) {
pCell = tdbPageGetCell(pPage, infoNews[iNew - 1].oIdx);
cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell);
......@@ -618,7 +618,55 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
}
} else { // internal leaf
// TODO
SCell *pCellRight;
SCell *pCellLeft;
int szCellRight, szCellLeft;
// get pCellLeft, szCellLeft
if (infoNews[iNew - 1].oIdx == nCells) {
ASSERT(infoNews[iNew - 1].oPage < nOlds - 1);
pCellLeft = pDivCell[infoNews[iNew - 1].oPage];
szCellLeft = szDivCell[infoNews[iNew - 1].oPage];
} else {
pCellLeft = tdbPageGetCell(pPage, infoNews[iNew - 1].oIdx);
szCellLeft = tdbBtreeCellSize(pPage, pCellLeft);
}
// get pCellRight, szCellRight
if (infoNews[iNew - 1].oIdx + 1 < nCells) {
pCellRight = tdbPageGetCell(pPage, infoNews[iNew - 1].oIdx + 1);
szCellRight = tdbBtreeCellSize(pPage, pCellRight);
} else {
if (infoNews[iNew - 1].oPage < nOlds - 1) {
pCellRight = pDivCell[infoNews[iNew - 1].oPage];
szCellRight = szDivCell[infoNews[iNew - 1].oPage];
} else {
// TODO: what if the next old page is empty?
pCellRight = tdbPageGetCell(pOlds[infoNews[iNew - 1].oPage + 1], 0);
szCellRight = tdbBtreeCellSize(pPage, pCellRight);
}
}
if (infoNews[iNew - 1].size - szCellLeft - TDB_PAGE_OFFSET_SIZE(pPage) <=
infoNews[iNew].size + szCellRight + TDB_PAGE_OFFSET_SIZE(pPage)) {
break;
}
// TODO: ASSERT(infoNews[iNew].size < PAGE_MAX_CAPACITY);
infoNews[iNew].cnt++;
infoNews[iNew].size += szCellRight;
infoNews[iNew - 1].cnt--;
infoNews[iNew - 1].size -= szCellLeft;
if ((infoNews[iNew - 1].oIdx--) == 0) {
infoNews[iNew - 1].oPage--;
pPage = pOlds[infoNews[iNew - 1].oPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
if (infoNews[iNew - 1].oPage < nOlds - 1) {
infoNews[iNew - 1].oIdx = nCells;
} else {
infoNews[iNew - 1].oIdx = nCells - 1;
}
}
}
}
}
......@@ -654,6 +702,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
int tPage, tIdx, iOld;
SCell *pCell;
int szCell;
int nCells;
SCellDecoder cd;
SBtreeInitPageArg iarg = {.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]), .pBt = pBt};
for (int i = 0; i < 2; i++) {
......@@ -666,58 +716,122 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
tPage = 0;
tIdx = 0;
iOld = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld++], pTPage[tPage]);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
if (!childLeaf) {
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
}
for (int iNew = 0; iNew < nNews; iNew++) {
// fill the iNew page
// TODO: copy current page to tmp space
tdbBtreeZeroPage(pNews[iNew], &iarg);
for (int iCell = 0; iCell < infoNews[iNew].cnt; iCell++) {
while (tIdx >= TDB_PAGE_TOTAL_CELLS(pTPage[tPage])) {
tPage = (tPage + 1) % 2;
tIdx = 0;
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld++], pTPage[tPage]);
for (;;) { // loop to find the next available cell
if (tIdx < nCells) {
pCell = tdbPageGetCell(pTPage[tPage], tIdx);
szCell = tdbBtreeCellSize(pTPage[tPage], pCell);
tIdx++;
break;
} else {
if (!childLeaf) {
if (iOld < nOlds - 1) {
pCell = pDivCell[iOld];
szCell = szDivCell[iOld];
((SPgno *)pCell)[0] = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
break;
} else {
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
}
} else {
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
}
}
}
pCell = tdbPageGetCell(pTPage[tPage], tIdx);
szCell = tdbBtreeCellSize(pTPage[tPage], pCell);
tdbPageInsertCell(pNews[iNew], iCell, pCell, szCell);
tIdx++;
}
}
for (int i = 0; i < 2; i++) {
tdbPageDestroy(pTPage[i], NULL, NULL);
}
}
{ // Insert records in parent page
int cIdx = sIdx;
int szCell;
SCell pCell[128]; // TODO
SCellDecoder cd;
for (int iNew = 0; iNew < nNews; iNew++) {
if (iNew == nNews - 1) {
// The last new page
SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;
if (pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
// fill right-most child pgno if internal page
if (!childLeaf) {
if (tIdx < nCells) {
pCell = tdbPageGetCell(pTPage[tPage], tIdx);
szCell = tdbBtreeCellSize(pTPage[tPage], pCell);
tIdx++;
break;
} else {
if (!childLeaf) {
if (iOld < nOlds - 1) {
pCell = pDivCell[iOld];
szCell = szDivCell[iOld];
((SPgno *)pCell)[0] = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
break;
} else {
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno;
}
} else {
iOld++;
tPage = (tPage + 1) % 2;
tIdx = 0;
nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]);
tdbBtreeZeroPage(pTPage[tPage], &iarg);
tdbPageCopy(pOlds[iOld], pTPage[tPage]);
}
}
((SIntHdr *)pNews[iNew]->pData)->pgno = ((SPgno *)pCell)[0];
}
tdbBtreeDecodeCell(pNews[iNew], tdbPageGetCell(pNews[iNew], TDB_PAGE_TOTAL_CELLS(pNews[iNew]) - 1), &cd);
// insert divider cell into the parent page
SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;
if (iNew == nNews - 1 && pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
} else {
tdbBtreeDecodeCell(pNews[iNew], tdbPageGetCell(pNews[iNew], TDB_PAGE_TOTAL_CELLS(pNews[iNew]) - 1), &cd);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&TDB_PAGE_PGNO(pNews[iNew]), sizeof(SPgno), pCell,
&szCell);
// TODO: the cell here may be used by pParent as an overflow cell
tdbPageInsertCell(pParent, sIdx++, pCell, szCell);
}
}
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&TDB_PAGE_PGNO(pNews[iNew]), sizeof(SPgno), pCell, &szCell);
// TODO: the cell here may be used by pParent as an overflow cell
tdbPageInsertCell(pParent, cIdx, pCell, szCell);
for (int i = 0; i < 2; i++) {
tdbPageDestroy(pTPage[i], NULL, NULL);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册