diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 4188985d0f4175b28d98adf10bdb0b9b3d146874..4f96a08ad9ad42f8d781d340f0c8b9a3d170c175 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -129,7 +129,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p } if (pCur->idx == -1) { - ASSERT(pCur->pPage->pPageHdr->nCells == 0); + ASSERT(TDB_PAGE_NCELLS(pCur->pPage) == 0); // TODO: insert the K-V pair to idx 0 } @@ -157,9 +157,9 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen) { pCur->iPage = 0; - if (pCur->pPage->pPageHdr->nCells == 0) { + if (TDB_PAGE_NCELLS(pCur->pPage) == 0) { // Current page is empty - ASSERT(TDB_FLAG_IS(pCur->pPage->pPageHdr->flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF)); + ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pCur->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF)); return 0; } @@ -351,19 +351,19 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) { flags = ((SBtreeZeroPageArg *)arg)->flags; pBt = ((SBtreeZeroPageArg *)arg)->pBt; - pPage->pPageHdr = (SPageHdr *)pPage->pData; + pPage->pPageHdr = pPage->pData; pPage->aCellIdx = (u8 *)(&(pPage->pPageHdr[1])); // Init the page header - pPage->pPageHdr->flags = flags; - pPage->pPageHdr->nCells = 0; - pPage->pPageHdr->cCells = pBt->pageSize; - pPage->pPageHdr->fCell = 0; - pPage->pPageHdr->nFree = 0; + TDB_PAGE_FLAGS_SET(pPage, flags); + TDB_PAGE_NCELLS_SET(pPage, 0); + TDB_PAGE_CCELLS_SET(pPage, pBt->pageSize); + TDB_PAGE_FCELL_SET(pPage, 0); + TDB_PAGE_NFREE_SET(pPage, 0); TDB_BTREE_ASSERT_FLAG(flags); - if (TDB_BTREE_PAGE_IS_LEAF(pPage->pPageHdr->flags)) { + if (TDB_BTREE_PAGE_IS_LEAF(flags)) { pPage->kLen = pBt->keyLen; pPage->vLen = pBt->valLen; pPage->maxLocal = pBt->maxLeaf; @@ -380,16 +380,18 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) { static int tdbBtreeInitPage(SPage *pPage, void *arg) { SBTree *pBt; + u16 flags; pBt = (SBTree *)arg; - pPage->pPageHdr = (SPageHdr *)pPage->pData; - pPage->aCellIdx = (u8 *)(&(pPage->pPageHdr[1])); + flags = TDB_PAGE_FLAGS(pPage); + pPage->pPageHdr = pPage->pData; + pPage->aCellIdx = pPage->pPageHdr + pPage->szPageHdr; - TDB_BTREE_ASSERT_FLAG(pPage->pPageHdr->flags); + TDB_BTREE_ASSERT_FLAG(flags); // Init other fields - if (TDB_BTREE_PAGE_IS_LEAF(pPage->pPageHdr->flags)) { + if (TDB_BTREE_PAGE_IS_LEAF(flags)) { pPage->kLen = pBt->keyLen; pPage->vLen = pBt->valLen; pPage->maxLocal = pBt->maxLeaf; @@ -461,6 +463,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { } static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) { +#if 0 int i; SPage *pParent; int nDiv; @@ -521,10 +524,12 @@ static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) { } } +#endif return 0; } static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) { +#if 0 SPage *pPage; int oidx; int cidx; @@ -570,6 +575,7 @@ static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) { /* TODO */ +#endif return 0; } @@ -680,7 +686,7 @@ static int tdbBtreeBalance(SBtCursor *pCur) { if (iPage == 0) { // Balance the root page by copy the root page content to // a child page and set the root page as empty first - ASSERT(TDB_BTREE_PAGE_IS_ROOT(pCur->pPage->pPageHdr->flags)); + // ASSERT(TDB_BTREE_PAGE_IS_ROOT(pCur->pPage->pPageHdr->flags)); ret = tdbBtreeBalanceDeeper(pCur->pBt, pCur->pPage, &(pCur->pgStack[1])); if (ret < 0) { diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index c6899d009e662cfb795a7e4ffaa9f94a0f03269d..5902a6a71630402fbe681830e394ce95df304ab0 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -34,6 +34,16 @@ typedef uint16_t u16; typedef uint32_t u32; typedef uint64_t u64; +// p must be u8 * +#define TDB_GET_U24(p) ((p)[0] * 65536 + *(u16 *)((p) + 1)) +#define TDB_PUT_U24(p, v) \ + do { \ + int tv = (v); \ + (p)[2] = tv & 0xff; \ + (p)[1] = (tv >> 8) & 0xff; \ + (p)[0] = (tv >> 16) & 0xff; \ + } while (0) + // SPgno typedef u32 SPgno; #define TDB_IVLD_PGNO ((pgno_t)0) diff --git a/source/libs/tdb/src/inc/tdbPage.h b/source/libs/tdb/src/inc/tdbPage.h index b3beabbc06d6b90e82fd4f2c0b76eed207de783b..70d70dbff547661fe9288a8d7fd487245e5aa4a1 100644 --- a/source/libs/tdb/src/inc/tdbPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -52,7 +52,7 @@ struct SPage { u8 szPageHdr; pthread_spinlock_t lock; // Fields below used by pager and am - SPageHdr *pPageHdr; + u8 *pPageHdr; SPageFtr *pPageFtr; u8 *aCellIdx; u8 *pFreeStart; @@ -80,42 +80,91 @@ struct SPage { #define TDB_SPAGE_NFREE(pPage) (((SPageHdr *)(pPage)->pPageHdr)->nFree) #define TDB_SPAGE_CELL_OFFSET_AT(pPage, idx) ((u16 *)((pPage)->aCellIdx))[idx] +#define TDB_SPAGE_FLAGS_SET(pPage, FLAGS) TDB_SPAGE_FLAGS(pPage) = (FLAGS) +#define TDB_SPAGE_NCELLS_SET(pPage, NCELLS) TDB_SPAGE_NCELLS(pPage) = (NCELLS) +#define TDB_SPAGE_CCELLS_SET(pPage, CCELLS) TDB_SPAGE_CCELLS(pPage) = (CCELLS) +#define TDB_SPAGE_FCELL_SET(pPage, FCELL) TDB_SPAGE_FCELL(pPage) = (FCELL) +#define TDB_SPAGE_NFREE_SET(pPage, NFREE) TDB_SPAGE_NFREE(pPage) = (NFREE) +#define TDB_SPAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) TDB_SPAGE_CELL_OFFSET_AT(pPage, idx) = (OFFSET) + /* For large page */ -#define TDB_LPAGE_FLAGS(pPage) (((SLPageHdr *)(pPage)->pPageHdr)->flags) -#define TDB_LPAGE_NCELLS(pPage) \ - ({ \ - u8 *ptr = ((SLPageHdr *)(pPage)->pPageHdr)->nCells; \ - ptr[0] * 65536 + *(u16 *)(&ptr[1]); \ - }) -#define TDB_LPAGE_CCELLS(pPage) \ - ({ \ - u8 *ptr = ((SLPageHdr *)(pPage)->pPageHdr)->cCells; \ - ptr[0] * 65536 + *(u16 *)(&ptr[1]); \ - }) -#define TDB_LPAGE_FCELL(pPage) \ - ({ \ - u8 *ptr = ((SLPageHdr *)(pPage)->pPageHdr)->fCell; \ - ptr[0] * 65536 + *(u16 *)(&ptr[1]); \ - }) -#define TDB_LPAGE_NFREE(pPage) \ - ({ \ - u8 *ptr = ((SLPageHdr *)(pPage)->pPageHdr)->nFree; \ - ptr[0] * 65536 + *(u16 *)(&ptr[1]); \ - }) -#define TDB_LPAGE_CELL_OFFSET_AT(pPage, idx) \ - ({ \ - u8 *ptr = (pPage)->aCellIdx + idx * 3; \ - ptr[0] * 65536 + *(u16 *)(&ptr[1]); \ - }) +#define TDB_LPAGE_FLAGS(pPage) (((SLPageHdr *)(pPage)->pPageHdr)->flags) +#define TDB_LPAGE_NCELLS(pPage) TDB_GET_U24(((SLPageHdr *)(pPage)->pPageHdr)->nCells) +#define TDB_LPAGE_CCELLS(pPage) TDB_GET_U24(((SLPageHdr *)(pPage)->pPageHdr)->cCells) +#define TDB_LPAGE_FCELL(pPage) TDB_GET_U24(((SLPageHdr *)(pPage)->pPageHdr)->fCell) +#define TDB_LPAGE_NFREE(pPage) TDB_GET_U24(((SLPageHdr *)(pPage)->pPageHdr)->nFree) +#define TDB_LPAGE_CELL_OFFSET_AT(pPage, idx) TDB_GET_U24((pPage)->aCellIdx + idx * 3) + +#define TDB_LPAGE_FLAGS_SET(pPage, FLAGS) TDB_LPAGE_FLAGS(pPage) = (flags) +#define TDB_LPAGE_NCELLS_SET(pPage, NCELLS) TDB_PUT_U24(((SLPageHdr *)(pPage)->pPageHdr)->nCells, NCELLS) +#define TDB_LPAGE_CCELLS_SET(pPage, CCELLS) TDB_PUT_U24(((SLPageHdr *)(pPage)->pPageHdr)->cCells, CCELLS) +#define TDB_LPAGE_FCELL_SET(pPage, FCELL) TDB_PUT_U24(((SLPageHdr *)(pPage)->pPageHdr)->fCell, FCELL) +#define TDB_LPAGE_NFREE_SET(pPage, NFREE) TDB_PUT_U24(((SLPageHdr *)(pPage)->pPageHdr)->nFree, NFREE) +#define TDB_LPAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) TDB_PUT_U24((pPage)->aCellIdx + idx * 3, OFFSET) /* For page */ -#define TDB_PAGE_FLAGS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FLAGS(pPage)) : TDB_SPAGE_FLAGS(pPage)) -#define TDB_PAGE_NCELLS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_NCELLS(pPage)) : TDB_SPAGE_NCELLS(pPage)) -#define TDB_PAGE_CCELLS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_CCELLS(pPage)) : TDB_SPAGE_CCELLS(pPage)) -#define TDB_PAGE_FCELL(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FCELL(pPage)) : TDB_SPAGE_FCELL(pPage)) -#define TDB_PAGE_NFREE(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_NFREE(pPage)) : TDB_SPAGE_NFREE(pPage)) +#define TDB_PAGE_FLAGS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FLAGS(pPage) : TDB_SPAGE_FLAGS(pPage)) +#define TDB_PAGE_NCELLS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_NCELLS(pPage) : TDB_SPAGE_NCELLS(pPage)) +#define TDB_PAGE_CCELLS(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_CCELLS(pPage) : TDB_SPAGE_CCELLS(pPage)) +#define TDB_PAGE_FCELL(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FCELL(pPage) : TDB_SPAGE_FCELL(pPage)) +#define TDB_PAGE_NFREE(pPage) (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_NFREE(pPage) : TDB_SPAGE_NFREE(pPage)) #define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) \ (TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_CELL_OFFSET_AT(pPage, idx) : TDB_SPAGE_CELL_OFFSET_AT(pPage, idx)) + +#define TDB_PAGE_FLAGS_SET(pPage, FLAGS) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_FLAGS_SET(pPage, FLAGS); \ + } else { \ + TDB_SPAGE_FLAGS_SET(pPage, FLAGS); \ + } \ + } while (0) + +#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_NCELLS_SET(pPage, NCELLS); \ + } else { \ + TDB_SPAGE_NCELLS_SET(pPage, NCELLS); \ + } \ + } while (0) + +#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_CCELLS_SET(pPage, CCELLS); \ + } else { \ + TDB_SPAGE_CCELLS_SET(pPage, CCELLS); \ + } \ + } while (0) + +#define TDB_PAGE_FCELL_SET(pPage, FCELL) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_FCELL_SET(pPage, FCELL); \ + } else { \ + TDB_SPAGE_FCELL_SET(pPage, FCELL); \ + } \ + } while (0) + +#define TDB_PAGE_NFREE_SET(pPage, NFREE) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_NFREE_SET(pPage, NFREE); \ + } else { \ + TDB_SPAGE_NFREE_SET(pPage, NFREE); \ + } \ + } while (0) + +#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) \ + do { \ + if (TDB_IS_LARGE_PAGE(pPage)) { \ + TDB_LPAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET); \ + } else { \ + TDB_SPAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET); \ + } \ + } while (0) + #define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx)) // For page lock